Dooders / Experiments

Repository for all specific experiments and tests
0 stars 0 forks source link

Feature: Implement `ExperimentDatabase` Class for Managing Simulation Results #61

Open csmangum opened 2 hours ago

csmangum commented 2 hours ago

The ExperimentDatabase class is a high-level database system designed to manage multiple simulation runs executed by the SimulationDatabase. This class should track simulation metadata, manage references to individual simulation databases, and aggregate results across simulations. The implementation will help centralize management and analysis of experiments, while keeping detailed simulation data isolated in their respective databases.


Goals


Acceptance Criteria

  1. Schema Definition

    • Create a Simulations table with the following columns:
      • simulation_id: Primary key.
      • start_time: Unix timestamp for the start of the simulation.
      • end_time: Unix timestamp for the end of the simulation.
      • status: Text field for simulation status (pending, running, completed, failed).
      • parameters: JSON-encoded string of simulation parameters.
      • results_summary: JSON-encoded string summarizing simulation results.
      • simulation_db_path: File path to the corresponding SimulationDatabase.
  2. Thread Safety

    • Use thread-local storage for SQLite connections and cursors to ensure thread safety.
  3. CRUD Methods

    • Add Simulation: Insert a new simulation record with parameters and database path.
    • Update Status: Update simulation status, end time, and results summary.
    • Retrieve Simulation: Fetch details of a specific simulation by ID.
    • List Simulations: Fetch all simulations, optionally filtered by status.
    • Delete Simulation: Remove a simulation record from the database.
  4. Export and Aggregation

    • Implement an export_experiment_data(filepath) method to save all data to a CSV file.
    • Implement a get_aggregate_results() method to aggregate metrics across all completed simulations.
  5. Integration with SimulationDatabase

    • Store references to individual simulation database file paths in simulation_db_path.
    • Enable interaction with SimulationDatabase instances for detailed data retrieval.
  6. Error Handling

    • Ensure proper error handling and logging for all database operations.
    • Catch and handle foreign key constraint violations.
  7. Documentation

    • Provide docstrings for all methods and classes.
    • Include usage examples for adding, updating, and retrieving simulations.

Implementation Steps

  1. Design Schema

    • Create a Simulations table with appropriate indices for performance.
  2. Initialize Class

    • Implement a constructor to initialize the database and create tables if they don’t exist.
  3. CRUD Methods

    • Implement methods for adding, updating, deleting, and retrieving simulations.
  4. Thread Safety

    • Use thread-local storage for connections and cursors.
  5. Export Data

    • Use pandas to export experiment data to CSV format.
  6. Aggregate Results

    • Design logic for aggregating results across completed simulations.
  7. Test Integration

    • Write test cases to verify integration with SimulationDatabase.
  8. Error Handling

    • Add robust error handling for transactional operations.

Tasks


Additional Notes


References

csmangum commented 2 hours ago

Implementation with SQLAlchemy ORM

Database Schema

from sqlalchemy import (
    create_engine, Column, Integer, String, Float, Text, ForeignKey, JSON, DateTime
)
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import relationship, sessionmaker
from datetime import datetime

Base = declarative_base()

class Simulation(Base):
    __tablename__ = "simulations"

    simulation_id = Column(Integer, primary_key=True, autoincrement=True)
    start_time = Column(DateTime, default=datetime.utcnow)
    end_time = Column(DateTime, nullable=True)
    status = Column(String(50), default="pending")
    parameters = Column(JSON, nullable=False)
    results_summary = Column(JSON, nullable=True)
    simulation_db_path = Column(String(255), nullable=False)

    # Relationships (optional, for future extensions)
    # e.g., could relate to logs, individual agent data, etc.
    # logs = relationship("Log", back_populates="simulation")

    def __repr__(self):
        return f"<Simulation(simulation_id={self.simulation_id}, status={self.status})>"

ExperimentDatabase Class

class ExperimentDatabase:
    def __init__(self, db_url: str):
        """
        Initialize the ExperimentDatabase with a given database URL.
        Example db_url: 'sqlite:///experiment.db'
        """
        self.engine = create_engine(db_url)
        Base.metadata.create_all(self.engine)  # Create tables if not exist
        self.Session = sessionmaker(bind=self.engine)

    def add_simulation(self, parameters: dict, simulation_db_path: str) -> int:
        """
        Add a new simulation record to the database.

        Parameters
        ----------
        parameters : dict
            JSON-serializable dictionary of simulation parameters.
        simulation_db_path : str
            File path to the associated simulation database.

        Returns
        -------
        int
            The simulation_id of the newly inserted simulation.
        """
        with self.Session() as session:
            new_simulation = Simulation(
                parameters=parameters,
                simulation_db_path=simulation_db_path,
                status="pending",
            )
            session.add(new_simulation)
            session.commit()
            return new_simulation.simulation_id

    def update_simulation_status(
        self,
        simulation_id: int,
        status: str,
        results_summary: dict = None,
    ):
        """
        Update the status and results of a simulation.

        Parameters
        ----------
        simulation_id : int
            The ID of the simulation to update.
        status : str
            New status of the simulation.
        results_summary : dict, optional
            JSON-serializable dictionary summarizing the results.
        """
        with self.Session() as session:
            simulation = session.query(Simulation).filter_by(simulation_id=simulation_id).one()
            simulation.status = status
            simulation.end_time = datetime.utcnow() if status in ["completed", "failed"] else None
            if results_summary:
                simulation.results_summary = results_summary
            session.commit()

    def get_simulation(self, simulation_id: int) -> Simulation:
        """
        Retrieve details of a specific simulation.

        Parameters
        ----------
        simulation_id : int
            The ID of the simulation to retrieve.

        Returns
        -------
        Simulation
            The simulation record as an SQLAlchemy ORM object.
        """
        with self.Session() as session:
            return session.query(Simulation).filter_by(simulation_id=simulation_id).one()

    def list_simulations(self, status: str = None) -> list:
        """
        List all simulations, optionally filtered by status.

        Parameters
        ----------
        status : str, optional
            Filter simulations by status.

        Returns
        -------
        list
            List of simulation records.
        """
        with self.Session() as session:
            query = session.query(Simulation)
            if status:
                query = query.filter_by(status=status)
            return query.order_by(Simulation.start_time.desc()).all()

    def delete_simulation(self, simulation_id: int):
        """
        Delete a simulation record.

        Parameters
        ----------
        simulation_id : int
            The ID of the simulation to delete.
        """
        with self.Session() as session:
            simulation = session.query(Simulation).filter_by(simulation_id=simulation_id).one()
            session.delete(simulation)
            session.commit()

    def export_experiment_data(self, filepath: str):
        """
        Export all experiment data to a CSV file.

        Parameters
        ----------
        filepath : str
            Path where the CSV file should be saved.
        """
        import pandas as pd

        with self.Session() as session:
            simulations = session.query(Simulation).all()
            data = [
                {
                    "simulation_id": sim.simulation_id,
                    "start_time": sim.start_time,
                    "end_time": sim.end_time,
                    "status": sim.status,
                    "parameters": sim.parameters,
                    "results_summary": sim.results_summary,
                    "simulation_db_path": sim.simulation_db_path,
                }
                for sim in simulations
            ]
            df = pd.DataFrame(data)
            df.to_csv(filepath, index=False)

    def get_aggregate_results(self) -> dict:
        """
        Aggregate results across completed simulations.

        Returns
        -------
        dict
            Aggregated results across simulations.
        """
        with self.Session() as session:
            completed_simulations = session.query(Simulation).filter_by(status="completed").all()
            aggregate_results = {}
            for sim in completed_simulations:
                if sim.results_summary:
                    for key, value in sim.results_summary.items():
                        if isinstance(value, (int, float)):
                            aggregate_results[key] = aggregate_results.get(key, 0) + value

            # Normalize results by the number of simulations
            num_simulations = len(completed_simulations)
            for key in aggregate_results:
                aggregate_results[key] /= num_simulations

            return aggregate_results

Example Usage

# Initialize ExperimentDatabase
experiment_db = ExperimentDatabase("sqlite:///experiment.db")

# Add a new simulation
simulation_id = experiment_db.add_simulation(
    parameters={"num_agents": 100, "steps": 500},
    simulation_db_path="simulations/simulation_1.db",
)
print(f"New simulation added with ID: {simulation_id}")

# Update simulation status
experiment_db.update_simulation_status(
    simulation_id=simulation_id,
    status="completed",
    results_summary={"total_agents": 100, "average_lifespan": 50.5},
)

# Retrieve a simulation
simulation = experiment_db.get_simulation(simulation_id)
print(f"Simulation Details: {simulation}")

# List all completed simulations
completed_sims = experiment_db.list_simulations(status="completed")
print(f"Completed Simulations: {completed_sims}")

# Export data to CSV
experiment_db.export_experiment_data("experiment_results.csv")

# Aggregate results
aggregate_results = experiment_db.get_aggregate_results()
print(f"Aggregate Results: {aggregate_results}")

# Delete a simulation
experiment_db.delete_simulation(simulation_id)

Benefits of Using SQLAlchemy ORM

  1. Declarative Model: The schema is represented as Python classes, improving code readability.
  2. Type Safety: ORM automatically maps Python types to SQL types, reducing errors.
  3. Session Management: SQLAlchemy provides a robust session system to handle transactions and rollbacks.
  4. Relationships: Easily extendable to include relationships between tables.
  5. Database Agnosticism: The same code can be used with different databases (SQLite, PostgreSQL, MySQL, etc.).
  6. Aggregation and Queries: SQLAlchemy ORM simplifies complex queries and aggregations.