diff --git a/scripts/hitl-bo/check_persistence.py b/scripts/hitl-bo/check_persistence.py new file mode 100644 index 00000000..e54daa6e --- /dev/null +++ b/scripts/hitl-bo/check_persistence.py @@ -0,0 +1,123 @@ +# Manual test for MongoDB persistence +# Run this to check the state before and after manually killing mongodbintegrationmvp.py + +from pymongo import MongoClient +from datetime import datetime +import json + +def check_mongodb_status(): + """Check MongoDB connection and experiment status.""" + try: + client = MongoClient("mongodb://localhost:27017/", serverSelectionTimeoutMS=2000) + client.admin.command("ping") + print("โœ… MongoDB is running and accessible") + + db = client["ax_db"] + snapshots_col = db["ax_snapshots"] + + experiment_name = "branin_experiment_k7m9" + + # Get all snapshots for this experiment + snapshots = list(snapshots_col.find( + {"experiment_name": experiment_name} + ).sort("timestamp", -1).limit(5)) + + if snapshots: + print(f"\n๐Ÿ“Š Found {len(snapshots)} recent snapshots for '{experiment_name}':") + for i, snapshot in enumerate(snapshots): + timestamp = snapshot['timestamp'] + trial_count = snapshot['trial_count'] + snapshot_id = str(snapshot['_id'])[:8] + "..." + print(f" {i+1}. {timestamp} | {trial_count} trials | ID: {snapshot_id}") + + # Show details of most recent + latest = snapshots[0] + print(f"\n๐Ÿ” Most recent snapshot details:") + print(f" Timestamp: {latest['timestamp']}") + print(f" Trial count: {latest['trial_count']}") + print(f" Document ID: {latest['_id']}") + + return latest['trial_count'] + else: + print(f"๐Ÿ“Š No snapshots found for experiment '{experiment_name}'") + return 0 + + except Exception as e: + print(f"โŒ Error: {e}") + return None + +def cleanup_old_experiments(): + """Optional: Clean up old experiment data.""" + try: + client = MongoClient("mongodb://localhost:27017/", serverSelectionTimeoutMS=2000) + db = client["ax_db"] + snapshots_col = db["ax_snapshots"] + + experiment_name = "branin_experiment_k7m9" + + count = snapshots_col.count_documents({"experiment_name": experiment_name}) + print(f"\n๐Ÿ—‘๏ธ Found {count} total snapshots for '{experiment_name}'") + + if count > 0: + choice = input("Do you want to delete all snapshots? (y/N): ").strip().lower() + if choice == 'y': + result = snapshots_col.delete_many({"experiment_name": experiment_name}) + print(f" Deleted {result.deleted_count} snapshots") + return True + + return False + + except Exception as e: + print(f"โŒ Error during cleanup: {e}") + return False + +def main(): + print("=" * 60) + print("๐Ÿ” MONGODB PERSISTENCE CHECKER") + print("=" * 60) + print("This tool helps you manually test the persistence of mongodbintegrationmvp.py") + print("") + print("How to use:") + print("1. Run this script to check current state") + print("2. Start mongodbintegrationmvp.py and let it run a few trials") + print("3. Kill mongodbintegrationmvp.py (Ctrl+C or close terminal)") + print("4. Run this script again to verify data was saved") + print("5. Start mongodbintegrationmvp.py again to see it resume") + print("") + + while True: + print("\nChoose an option:") + print("1. Check current experiment status") + print("2. Clean up old experiments (delete all data)") + print("3. Exit") + + choice = input("\nEnter choice (1-3): ").strip() + + if choice == "1": + print("\n" + "-" * 40) + trial_count = check_mongodb_status() + print("-" * 40) + + if trial_count is not None: + if trial_count == 0: + print("\n๐Ÿ’ก No trials found. You can now:") + print(" - Start mongodbintegrationmvp.py to begin a new experiment") + else: + print(f"\n๐Ÿ’ก Found {trial_count} trials. You can now:") + print(" - Start mongodbintegrationmvp.py to resume the experiment") + print(" - Or kill it partway through to test persistence") + + elif choice == "2": + print("\n" + "-" * 40) + cleanup_old_experiments() + print("-" * 40) + + elif choice == "3": + print("\n๐Ÿ‘‹ Goodbye!") + break + + else: + print("โŒ Invalid choice. Please enter 1, 2, or 3.") + +if __name__ == "__main__": + main() diff --git a/scripts/hitl-bo/mongodbintegration.py b/scripts/hitl-bo/mongodbintegration.py new file mode 100644 index 00000000..b34a12e5 --- /dev/null +++ b/scripts/hitl-bo/mongodbintegration.py @@ -0,0 +1,254 @@ +# Generated by Honegumi (https://arxiv.org/abs/2502.06815) +# pip install ax-platform==0.4.3 numpy pymongo +import json +import os +from datetime import datetime +import random +import string + +import numpy as np +from ax.modelbridge.generation_strategy import GenerationStep, GenerationStrategy +from ax.modelbridge.registry import Models +from ax.service.ax_client import AxClient, ObjectiveProperties +from pymongo import MongoClient, errors + +obj1_name = "branin" +MAX_TRIALS = 19 # Configuration constant + +# These will be set based on user choice +experiment_id = None +db_name = None + + +def branin(x1, x2): + """Branin function - a common benchmark for optimization.""" + y = float( + (x2 - 5.1 / (4 * np.pi**2) * x1**2 + 5.0 / np.pi * x1 - 6.0) ** 2 + + 10 * (1 - 1.0 / (8 * np.pi)) * np.cos(x1) + + 10 + ) + return y + + +def generate_random_id(length=4): + """Generate a random alphanumeric ID.""" + return ''.join(random.choices(string.ascii_lowercase + string.digits, k=length)) + + +def get_user_choice(): + """Ask user whether to continue previous experiment or start new one.""" + print("\n" + "="*50) + print("EXPERIMENT SETUP") + print("="*50) + print("Choose an option:") + print("1. Continue previous experiment (use existing database)") + print("2. Start new experiment (create new database)") + + while True: + choice = input("\nEnter your choice (1 or 2): ").strip() + if choice == "1": + return "continue" + elif choice == "2": + return "new" + else: + print("Invalid choice. Please enter 1 or 2.") + + +def setup_experiment_config(choice): + """Set up experiment configuration based on user choice.""" + global experiment_id, db_name + + if choice == "continue": + # Use default previous experiment settings + experiment_id = f"{obj1_name}_experiment_k7m9" + db_name = "ax_db" + print(f"\nContinuing previous experiment...") + print(f"Database: {db_name}") + print(f"Experiment ID: {experiment_id}") + else: + # Create new experiment with random ID + random_id = generate_random_id() + experiment_id = f"{obj1_name}_experiment_{random_id}" + db_name = f"ax_db_{random_id}" + print(f"\nStarting new experiment...") + print(f"Database: {db_name}") + print(f"Experiment ID: {experiment_id}") + + return experiment_id, db_name + + +# Get user choice and setup configuration +user_choice = get_user_choice() +experiment_id, db_name = setup_experiment_config(user_choice) + + +# Connect to MongoDB +mongo_client = MongoClient( + "mongodb://localhost:27017/", serverSelectionTimeoutMS=5000 +) +# Test the connection +mongo_client.admin.command("ping") +db = mongo_client[db_name] # Use dynamic database name +snapshots_col = db["ax_snapshots"] # Collection for storing JSON snapshots +print(f"Connected to MongoDB successfully (Database: {db_name})") + +# Experiment configuration +parameters = [ + {"name": "x1", "type": "range", "bounds": [-5.0, 10.0]}, + {"name": "x2", "type": "range", "bounds": [0.0, 10.0]}, +] +objectives = {obj1_name: ObjectiveProperties(minimize=True)} + + +def save_ax_snapshot_to_mongodb(ax_client, experiment_name): + """Save Ax client snapshot to MongoDB with timestamp (append, don't overwrite).""" + try: + # Insert document first to get unique ID + snapshot_doc = { + "experiment_name": experiment_name, + "snapshot_data": {}, # Placeholder, will be updated + "timestamp": datetime.now().isoformat(), + "trial_count": ( + len(ax_client.get_trials_data_frame()) + if ax_client.get_trials_data_frame() is not None + else 0 + ), + } + + # Insert a new document for every snapshot (no overwrite) + result = snapshots_col.insert_one(snapshot_doc) + + # Use database ID in temp filename to avoid conflicts + temp_file = f"temp_{experiment_name}_{result.inserted_id}_snapshot.json" + ax_client.save_to_json_file(temp_file) + + with open(temp_file, "r") as f: + snapshot_data = json.load(f) + + # Update the document with actual snapshot data + snapshots_col.update_one( + {"_id": result.inserted_id}, + {"$set": {"snapshot_data": snapshot_data}} + ) + + os.remove(temp_file) + + print(f"Snapshot saved to MongoDB at {snapshot_doc['timestamp']} (ID: {result.inserted_id})") + return result.inserted_id + + except Exception as e: + print(f"Error saving snapshot: {e}") + return None + + +def load_ax_snapshot_from_mongodb(experiment_name): + """Load most recent Ax client snapshot from MongoDB.""" + try: + # Find the most recent snapshot + record = snapshots_col.find_one( + {"experiment_name": experiment_name}, + sort=[("timestamp", -1)], # Most recent first + ) + + if record: + # Use database ID in temp filename to avoid conflicts + temp_file = f"temp_{experiment_name}_{record['_id']}_snapshot.json" + with open(temp_file, "w") as f: + json.dump(record["snapshot_data"], f) + + # Load AxClient from file + ax_client = AxClient.load_from_json_file(temp_file) + + # Clean up temp file + os.remove(temp_file) + + print( + f"Loaded snapshot from {record['timestamp']} with " + f"{record['trial_count']} trials" + ) + return ax_client + else: + print("No existing snapshot found") + return None + + except Exception as e: + print(f"Error loading snapshot: {e}") + return None + + +# Load existing experiment or create new one +ax_client = load_ax_snapshot_from_mongodb(experiment_id) + +if ax_client is None: + # Create new experiment (Ax will use default generation strategy) + ax_client = AxClient() + ax_client.create_experiment( + name=experiment_id, parameters=parameters, objectives=objectives + ) + print(f"Created new experiment '{experiment_id}' with default generation strategy") + + # Save initial snapshot + save_ax_snapshot_to_mongodb(ax_client, experiment_id) +else: + print(f"Resuming existing experiment '{experiment_id}'") + +# Get current trial count to determine how many more trials to run +current_trials = ax_client.get_trials_data_frame() +start_trial = len(current_trials) if current_trials is not None else 0 + +print(f"Starting optimization: running trials {start_trial} to {MAX_TRIALS-1}") + +for i in range(start_trial, MAX_TRIALS): + # Get next trial + parameterization, trial_index = ax_client.get_next_trial() + + # Extract parameters + x1 = parameterization["x1"] + x2 = parameterization["x2"] + + print(f"Trial {trial_index}: x1={x1:.3f}, x2={x2:.3f}") + + # Save snapshot before running experiment (preserves pending trial) + save_ax_snapshot_to_mongodb(ax_client, experiment_id) + + # Evaluate objective function + results = branin(x1, x2) + + # Format raw_data as expected by AxClient + raw_data = {obj1_name: results} + + # Complete trial + ax_client.complete_trial(trial_index=trial_index, raw_data=raw_data) + + # Save snapshot after completing trial + save_ax_snapshot_to_mongodb(ax_client, experiment_id) + + # Get current best for progress tracking + best_parameters, best_metrics = ax_client.get_best_parameters() + best_value = best_metrics[0][obj1_name] + print( + f"Trial {trial_index}: result={results:.3f} | " + f"Best so far: {best_value:.3f}" + ) + +print("\nOptimization completed!") +best_parameters, best_metrics = ax_client.get_best_parameters() +print(f"Best parameters: {best_parameters}") +print(f"Best metrics: {best_metrics}") + +# Save final snapshot +save_ax_snapshot_to_mongodb(ax_client, experiment_id) + +# Print experiment summary +trials_df = ax_client.get_trials_data_frame() +if trials_df is not None: + print(f"Total trials completed: {len(trials_df)}") + print(f"Best objective value: {trials_df[obj1_name].min():.6f}") + +# Clean up MongoDB connection +mongo_client.close() +print("MongoDB connection closed") + +# Optional: Display trials data frame for debugging +print("\nTrials Summary:") +print(ax_client.get_trials_data_frame()) \ No newline at end of file diff --git a/scripts/hitl-bo/mongodbintegrationmvp.py b/scripts/hitl-bo/mongodbintegrationmvp.py new file mode 100644 index 00000000..43292b97 --- /dev/null +++ b/scripts/hitl-bo/mongodbintegrationmvp.py @@ -0,0 +1,205 @@ +# Generated by Honegumi (https://arxiv.org/abs/2502.06815) +# pip install ax-platform==0.4.3 numpy pymongo +import json +import os +import time +from datetime import datetime + +import numpy as np +from ax.modelbridge.generation_strategy import GenerationStep, GenerationStrategy +from ax.modelbridge.registry import Models +from ax.service.ax_client import AxClient, ObjectiveProperties +from pymongo import MongoClient, errors + +obj1_name = "branin" +MAX_TRIALS = 19 # Configuration constant + +# Experiment identifier (separate from objective name) with hardcoded unique ID +experiment_id = f"{obj1_name}_experiment_k7m9" + + +def branin(x1, x2): + """Branin function - a common benchmark for optimization.""" + y = float( + (x2 - 5.1 / (4 * np.pi**2) * x1**2 + 5.0 / np.pi * x1 - 6.0) ** 2 + + 10 * (1 - 1.0 / (8 * np.pi)) * np.cos(x1) + + 10 + ) + return y + + +# Connect to MongoDB +mongo_client = MongoClient( + "mongodb://localhost:27017/", serverSelectionTimeoutMS=5000 +) +# Test the connection +mongo_client.admin.command("ping") +db = mongo_client["ax_db"] +snapshots_col = db["ax_snapshots"] # Collection for storing JSON snapshots +print("Connected to MongoDB successfully") + +# Experiment configuration +parameters = [ + {"name": "x1", "type": "range", "bounds": [-5.0, 10.0]}, + {"name": "x2", "type": "range", "bounds": [0.0, 10.0]}, +] +objectives = {obj1_name: ObjectiveProperties(minimize=True)} + + +def save_ax_snapshot_to_mongodb(ax_client, experiment_name): + """Save Ax client snapshot to MongoDB with timestamp (append, don't overwrite).""" + try: + # Insert document first to get unique ID + snapshot_doc = { + "experiment_name": experiment_name, + "snapshot_data": {}, # Placeholder, will be updated + "timestamp": datetime.now().isoformat(), + "trial_count": ( + len(ax_client.get_trials_data_frame()) + if ax_client.get_trials_data_frame() is not None + else 0 + ), + } + + # Insert a new document for every snapshot (no overwrite) + result = snapshots_col.insert_one(snapshot_doc) + + # Use database ID in temp filename to avoid conflicts + temp_file = f"temp_{experiment_name}_{result.inserted_id}_snapshot.json" + ax_client.save_to_json_file(temp_file) + + with open(temp_file, "r") as f: + snapshot_data = json.load(f) + + # Update the document with actual snapshot data + snapshots_col.update_one( + {"_id": result.inserted_id}, + {"$set": {"snapshot_data": snapshot_data}} + ) + + os.remove(temp_file) + + print(f"Snapshot saved to MongoDB at {snapshot_doc['timestamp']} (ID: {result.inserted_id})") + return result.inserted_id + + except Exception as e: + print(f"Error saving snapshot: {e}") + return None + + +def load_ax_snapshot_from_mongodb(experiment_name): + """Load most recent Ax client snapshot from MongoDB.""" + try: + # Find the most recent snapshot + record = snapshots_col.find_one( + {"experiment_name": experiment_name}, + sort=[("timestamp", -1)], # Most recent first + ) + + if record: + # Use database ID in temp filename to avoid conflicts + temp_file = f"temp_{experiment_name}_{record['_id']}_snapshot.json" + with open(temp_file, "w") as f: + json.dump(record["snapshot_data"], f) + + # Load AxClient from file + ax_client = AxClient.load_from_json_file(temp_file) + + # Clean up temp file + os.remove(temp_file) + + print( + f"Loaded snapshot from {record['timestamp']} with " + f"{record['trial_count']} trials" + ) + return ax_client + else: + print("No existing snapshot found") + return None + + except Exception as e: + print(f"Error loading snapshot: {e}") + return None + + +# Load existing experiment or create new one +ax_client = load_ax_snapshot_from_mongodb(experiment_id) + +if ax_client is None: + # Create new experiment (Ax will use default generation strategy) + ax_client = AxClient() + ax_client.create_experiment( + name=experiment_id, parameters=parameters, objectives=objectives + ) + print("Created new experiment with default generation strategy") + + # Save initial snapshot + save_ax_snapshot_to_mongodb(ax_client, experiment_id) +else: + print("Resuming existing experiment") + +# Get current trial count to determine how many more trials to run +current_trials = ax_client.get_trials_data_frame() +start_trial = len(current_trials) if current_trials is not None else 0 + +print(f"Starting optimization: running trials {start_trial} to {MAX_TRIALS-1}") + +for i in range(start_trial, MAX_TRIALS): + # Get next trial + parameterization, trial_index = ax_client.get_next_trial() + + # Extract parameters + x1 = parameterization["x1"] + x2 = parameterization["x2"] + + print(f"Trial {trial_index}: x1={x1:.3f}, x2={x2:.3f}") + + # Add delay for manual testing - gives time to kill the process + print("โณ Starting trial evaluation in 3 seconds... (Press Ctrl+C to simulate kernel kill)") + time.sleep(3) + + # Save snapshot before running experiment (preserves pending trial) + save_ax_snapshot_to_mongodb(ax_client, experiment_id) + + # Evaluate objective function + results = branin(x1, x2) + + # Format raw_data as expected by AxClient + raw_data = {obj1_name: results} + + # Complete trial + ax_client.complete_trial(trial_index=trial_index, raw_data=raw_data) + + # Save snapshot after completing trial + save_ax_snapshot_to_mongodb(ax_client, experiment_id) + + # Get current best for progress tracking + best_parameters, best_metrics = ax_client.get_best_parameters() + best_value = best_metrics[0][obj1_name] + print( + f"Trial {trial_index}: result={results:.3f} | " + f"Best so far: {best_value:.3f}" + ) + + # Add pause after trial completion for manual testing + print("โœ… Trial completed! Waiting 2 seconds before next trial... (Press Ctrl+C to test recovery)") + time.sleep(2) + print("=" * 50) + +print("\nOptimization completed!") +best_parameters, best_metrics = ax_client.get_best_parameters() +print(f"Best parameters: {best_parameters}") +print(f"Best metrics: {best_metrics}") + +save_ax_snapshot_to_mongodb(ax_client, experiment_id) + +trials_df = ax_client.get_trials_data_frame() +if trials_df is not None: + print(f"Total trials completed: {len(trials_df)}") + print(f"Best objective value: {trials_df[obj1_name].min():.6f}") + +mongo_client.close() +print("MongoDB connection closed") + +print("\nTrials Summary:") +print(ax_client.get_trials_data_frame()) \ No newline at end of file diff --git a/scripts/hitl-bo/simple_test.py b/scripts/hitl-bo/simple_test.py new file mode 100644 index 00000000..1c95be85 --- /dev/null +++ b/scripts/hitl-bo/simple_test.py @@ -0,0 +1,116 @@ +# Simple test to demonstrate MongoDB persistence +# This script will run mongodbintegrationmvp.py twice to show persistence + +import subprocess +import sys +import time +from pymongo import MongoClient + +def check_experiment_trials(): + """Check how many trials are in the database.""" + try: + client = MongoClient("mongodb://localhost:27017/", serverSelectionTimeoutMS=2000) + db = client["ax_db"] + snapshots_col = db["ax_snapshots"] + + experiment_name = "branin_experiment_k7m9" + record = snapshots_col.find_one( + {"experiment_name": experiment_name}, + sort=[("timestamp", -1)] + ) + + if record: + return record['trial_count'] + return 0 + except: + return 0 + +def run_experiment_limited(max_trials=5): + """Run the experiment but modify MAX_TRIALS temporarily.""" + print(f"๐Ÿš€ Running experiment with MAX_TRIALS={max_trials}") + + # Read the original file + with open("mongodbintegrationmvp.py", "r") as f: + original_content = f.read() + + # Create a temporary modified version + modified_content = original_content.replace( + 'MAX_TRIALS = 19', + f'MAX_TRIALS = {max_trials}' + ) + + with open("temp_experiment.py", "w") as f: + f.write(modified_content) + + try: + # Run the modified experiment + result = subprocess.run([ + sys.executable, "temp_experiment.py" + ], capture_output=True, text=True, timeout=30) + + print("๐Ÿ“ Output:") + for line in result.stdout.split('\n'): + if line.strip() and any(keyword in line for keyword in + ["Trial", "Best", "Connected", "Created", "Resuming", "completed"]): + print(f" {line}") + + return result.returncode == 0 + + except subprocess.TimeoutExpired: + print("โฐ Experiment timed out (might be normal)") + return True + finally: + # Clean up temp file + try: + import os + os.remove("temp_experiment.py") + except: + pass + +def main(): + print("=" * 50) + print("๐Ÿงช TESTING MONGODB PERSISTENCE") + print("=" * 50) + + # Check MongoDB connection + try: + client = MongoClient("mongodb://localhost:27017/", serverSelectionTimeoutMS=2000) + client.admin.command("ping") + print("โœ… MongoDB is running") + except: + print("โŒ MongoDB is not running. Please start MongoDB first.") + return + + print("\n๐Ÿ“‹ STEP 1: Check initial state") + initial_trials = check_experiment_trials() + print(f" Initial trials in database: {initial_trials}") + + print("\n๐Ÿ“‹ STEP 2: Run experiment (first run)") + run_experiment_limited(max_trials=initial_trials + 3) + time.sleep(1) + + after_first_run = check_experiment_trials() + print(f" Trials after first run: {after_first_run}") + + print("\n๐Ÿ“‹ STEP 3: Run experiment again (second run - should resume)") + run_experiment_limited(max_trials=after_first_run + 2) + time.sleep(1) + + final_trials = check_experiment_trials() + print(f" Trials after second run: {final_trials}") + + print("\n" + "=" * 50) + print("๐Ÿ“Š RESULTS") + print("=" * 50) + print(f"Initial: {initial_trials} trials") + print(f"After 1st: {after_first_run} trials") + print(f"After 2nd: {final_trials} trials") + + if final_trials > after_first_run >= initial_trials: + print("\n๐ŸŽ‰ SUCCESS! Experiment resumed correctly from MongoDB") + print(" This proves the kernel can be 'killed' and experiment continues") + else: + print("\nโŒ Something went wrong with persistence") + +if __name__ == "__main__": + main() diff --git a/scripts/hitl-bo/test_persistence.py b/scripts/hitl-bo/test_persistence.py new file mode 100644 index 00000000..7b61eb29 --- /dev/null +++ b/scripts/hitl-bo/test_persistence.py @@ -0,0 +1,169 @@ +# Test script to simulate kernel interruption and recovery +# This demonstrates MongoDB persistence in mongodbintegrationmvp.py + +import subprocess +import sys +import time +import signal +import os +from pymongo import MongoClient + +def check_mongodb_connection(): + """Check if MongoDB is running and accessible.""" + try: + client = MongoClient("mongodb://localhost:27017/", serverSelectionTimeoutMS=2000) + client.admin.command("ping") + print("โœ… MongoDB is running and accessible") + return True + except Exception as e: + print(f"โŒ MongoDB connection failed: {e}") + return False + +def get_experiment_status(): + """Check current experiment status in MongoDB.""" + try: + client = MongoClient("mongodb://localhost:27017/", serverSelectionTimeoutMS=2000) + db = client["ax_db"] + snapshots_col = db["ax_snapshots"] + + # Find the most recent snapshot for the default experiment + experiment_name = "branin_experiment_k7m9" + record = snapshots_col.find_one( + {"experiment_name": experiment_name}, + sort=[("timestamp", -1)] + ) + + if record: + print(f"๐Ÿ“Š Found experiment '{experiment_name}' with {record['trial_count']} trials") + print(f" Last updated: {record['timestamp']}") + return record['trial_count'] + else: + print(f"๐Ÿ“Š No existing experiment '{experiment_name}' found") + return 0 + except Exception as e: + print(f"โŒ Error checking experiment status: {e}") + return 0 + +def run_experiment_with_interruption(trials_before_kill=3): + """Run the experiment and kill it after a few trials.""" + print(f"\n๐Ÿš€ Starting experiment (will be killed after ~{trials_before_kill} trials)...") + + # Start the experiment as a subprocess + process = subprocess.Popen([ + sys.executable, "mongodbintegrationmvp.py" + ], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, bufsize=1) + + trial_count = 0 + output_lines = [] + + try: + while True: + output = process.stdout.readline() + if output == '' and process.poll() is not None: + break + if output: + output_lines.append(output.strip()) + print(f"๐Ÿ“ {output.strip()}") + + # Count trials and kill after specified number + if "Trial" in output and "x1=" in output and "x2=" in output: + trial_count += 1 + if trial_count >= trials_before_kill: + print(f"\n๐Ÿ’€ SIMULATING KERNEL KILL after {trial_count} trials...") + process.terminate() + time.sleep(2) + if process.poll() is None: + process.kill() + break + + except KeyboardInterrupt: + print("\n๐Ÿ’€ SIMULATING KERNEL KILL (KeyboardInterrupt)...") + process.terminate() + time.sleep(1) + if process.poll() is None: + process.kill() + + print(f"๐Ÿ”ด Process terminated after {trial_count} trials") + return trial_count + +def run_experiment_recovery(): + """Run the experiment again to test recovery.""" + print("\n๐Ÿ”„ Testing recovery - running experiment again...") + + try: + # Run the experiment normally + result = subprocess.run([ + sys.executable, "mongodbintegrationmvp.py" + ], capture_output=True, text=True, timeout=60) + + print("๐Ÿ“ Recovery run output:") + for line in result.stdout.split('\n'): + if line.strip(): + print(f" {line}") + + if result.stderr: + print("โš ๏ธ Errors during recovery:") + for line in result.stderr.split('\n'): + if line.strip(): + print(f" {line}") + + return result.returncode == 0 + + except subprocess.TimeoutExpired: + print("โฐ Recovery run timed out (normal for long experiments)") + return True + except Exception as e: + print(f"โŒ Error during recovery run: {e}") + return False + +def main(): + """Main test function.""" + print("=" * 60) + print("๐Ÿงช TESTING MONGODB PERSISTENCE & RECOVERY") + print("=" * 60) + + # Step 1: Check MongoDB + if not check_mongodb_connection(): + print("โŒ Cannot proceed without MongoDB. Please start MongoDB first.") + return + + # Step 2: Check initial state + print("\n๐Ÿ“‹ STEP 1: Checking initial experiment state") + initial_trials = get_experiment_status() + + # Step 3: Run experiment with interruption + print("\n๐Ÿ“‹ STEP 2: Running experiment with simulated interruption") + trials_completed = run_experiment_with_interruption(trials_before_kill=3) + + # Step 4: Check state after interruption + print("\n๐Ÿ“‹ STEP 3: Checking experiment state after interruption") + time.sleep(2) # Give MongoDB time to process + post_interrupt_trials = get_experiment_status() + + # Step 5: Test recovery + print("\n๐Ÿ“‹ STEP 4: Testing recovery from MongoDB") + recovery_success = run_experiment_recovery() + + # Step 6: Final state check + print("\n๐Ÿ“‹ STEP 5: Final experiment state check") + final_trials = get_experiment_status() + + # Summary + print("\n" + "=" * 60) + print("๐Ÿ“Š TEST SUMMARY") + print("=" * 60) + print(f"Initial trials: {initial_trials}") + print(f"Trials after kill: {post_interrupt_trials}") + print(f"Final trials: {final_trials}") + print(f"Recovery successful: {'โœ… YES' if recovery_success else 'โŒ NO'}") + print(f"Data persisted: {'โœ… YES' if post_interrupt_trials > initial_trials else 'โŒ NO'}") + print(f"Experiment continued: {'โœ… YES' if final_trials >= post_interrupt_trials else 'โŒ NO'}") + + if post_interrupt_trials > initial_trials and recovery_success: + print("\n๐ŸŽ‰ SUCCESS: MongoDB persistence is working correctly!") + print(" The experiment successfully resumed from the database after interruption.") + else: + print("\nโŒ FAILURE: There may be issues with MongoDB persistence.") + +if __name__ == "__main__": + main()