camel-ai / camel

🐫 CAMEL: Finding the Scaling Law of Agents. A multi-agent framework. https://www.camel-ai.org
https://www.camel-ai.org
Apache License 2.0
5.35k stars 653 forks source link

[Feature Request] Storage of `Message` generated from agent/roleplaying/workforce #755

Open Wendong-Fan opened 1 month ago

Wendong-Fan commented 1 month ago

Required prerequisites

Motivation

support message storage

Solution

SQLLite

MySQL

PostgreSQL https://www.postgresql.org/

PostgreSQL supports complex data types such as JSON and JSONB, which can be useful for storing the structured message content with varied data types (text, image, audio, video, function_calling, etc.). It’s also highly extensible, allowing you to define custom functions, data types, and operators that can be very useful for your specific use case.

Alternatives

No response

Additional context

No response

raywhoelse commented 1 month ago

After me and wendong discuss, we decided to use PostgreSQL as our first integrate database components, and we decided use Factory Pattern as our database components Design Pattern, and there are some fake code can be referenced

raywhoelse commented 1 month ago
from abc import ABC, abstractmethod
import psycopg2
import mysql.connector

# Generic database interface
class DatabaseInterface(ABC):

    @abstractmethod
    def connect(self):
        pass

    @abstractmethod
    def execute_query(self, query: str):
        pass

    @abstractmethod
    def close(self):
        pass

# PostgreSQL adapter
class PostgreSQLAdapter(DatabaseInterface):

    def __init__(self, database, user, password, host, port):
        self.connection = None
        self.database = database
        self.user = user
        self.password = password
        self.host = host
        self.port = port

    def connect(self):
        self.connection = psycopg2.connect(
            database=self.database,
            user=self.user,
            password=self.password,
            host=self.host,
            port=self.port
        )
        print("Connected to PostgreSQL")

    def execute_query(self, query: str):
        cursor = self.connection.cursor()
        cursor.execute(query)
        self.connection.commit()
        cursor.close()
        print("Query executed")

    def close(self):
        if self.connection:
            self.connection.close()
            print("PostgreSQL connection closed")

# MySQL adapter
class MySQLAdapter(DatabaseInterface):

    def __init__(self, database, user, password, host, port):
        self.connection = None
        self.database = database
        self.user = user
        self.password = password
        self.host = host
        self.port = port

    def connect(self):
        self.connection = mysql.connector.connect(
            database=self.database,
            user=self.user,
            password=self.password,
            host=self.host,
            port=self.port
        )
        print("Connected to MySQL")

    def execute_query(self, query: str):
        cursor = self.connection.cursor()
        cursor.execute(query)
        self.connection.commit()
        cursor.close()
        print("Query executed")

    def close(self):
        if self.connection:
            self.connection.close()
            print("MySQL connection closed")

# Database factory
class DatabaseFactory:

    @staticmethod
    def get_database_adapter(db_type: str, **kwargs) -> DatabaseInterface:
        if db_type == 'postgresql':
            return PostgreSQLAdapter(**kwargs)
        elif db_type == 'mysql':
            return MySQLAdapter(**kwargs)
        else:
            raise ValueError(f"Unsupported database type: {db_type}")

# Use factory pattern and adapter pattern for database operations
def main():
    db_config = {
        'database': 'testdb',
        'user': 'testuser',
        'password': 'testpass',
        'host': 'localhost',
        'port': '5432'
    }

    # Use PostgreSQL
    postgres_adapter = DatabaseFactory.get_database_adapter('postgresql', **db_config)
    postgres_adapter.connect()
    postgres_adapter.execute_query("CREATE TABLE IF NOT EXISTS test (id SERIAL PRIMARY KEY, name VARCHAR(50));")
    postgres_adapter.close()

    # Modify config for MySQL
    db_config['port'] = '3306'

    # Use MySQL
    mysql_adapter = DatabaseFactory.get_database_adapter('mysql', **db_config)
    mysql_adapter.connect()
    mysql_adapter.execute_query("CREATE TABLE IF NOT EXISTS test (id INT PRIMARY KEY AUTO_INCREMENT, name VARCHAR(50));")
    mysql_adapter.close()

if __name__ == "__main__":
    main()