theseus-rs / postgresql-embedded

Embed PostgreSQL database
Apache License 2.0
59 stars 8 forks source link

[postgresql-extensions] Settings trait not thread safe #121

Open spikecodes opened 1 month ago

spikecodes commented 1 month ago

I'm trying to use the new install command from the postgresql-extensions crate but can't seem to pass the Settings object (regardless if I clone the Settings, reconstruct it or just use a new Settings::default()) without getting this error in my Rocket::launch method:

`dyn postgresql_commands::traits::Settings` cannot be shared between threads safely
the trait `Sync` is not implemented for `dyn postgresql_commands::traits::Settings`, which is required by `{async block@src/lib.rs:78:32: 78:33}: std::marker::Send`
required for `&dyn postgresql_commands::traits::Settings` to implement `std::marker::Send`rustc[Click for full compiler diagnostic](rust-analyzer-diagnostics-view:/diagnostic%20message%20%5B0%5D?0#file%3A%2F%2F%2Fhome%2Fspike%2FDocuments%2Fcoding%2FdRAGon_db%2Fsrc%2Flib.rs)
extensions.rs(136, 77): required because it's used within this `async` fn body
extensions.rs(39, 1): required because it's used within this `async` block
extensions.rs(39, 1): required because it's used within this `async` fn body
db_postgres.rs(130, 80): required because it's used within this `async` fn body
db_postgres.rs(81, 67): required because it's used within this `async` fn body
db_postgres.rs(77, 37): required because it's used within this `async` fn body
lib.rs(78, 32): required because it's used within this `async` block
lib.rs(266, 66): required by a bound in `async_main`

My code:

use pgvector::Vector;
use postgresql_embedded::{PostgreSQL, Settings};
use postgresql_extensions::{get_installed_extensions, install as install_extension};
use semver::VersionReq;
use std::path::{Path, PathBuf};

#[get("/hello/<name>/<age>")]
fn hello(name: &str, age: u8) -> String {
    format!("Hello, {} year old named {}!", age, name)
}

#[launch]
pub async fn start_server() -> _ {
    let database: db_postgres::VectorDB = db_postgres::VectorDB::new().await.unwrap();
    info!("Database initialized");
    rocket::build().mount("/", routes![hello])
}

impl VectorDB {
    pub async fn new() -> Result<Self> {
        Self::with_config(VectorDBConfig::default()).await
    }

    pub async fn with_config(config: VectorDBConfig) -> Result<Self> {
        let storage_dir: PathBuf = Path::new(&config.clone().path).to_path_buf();

        println!("Installing PostgreSQL v{}", PG_VERSION);

        let mut settings = Settings::new();
        settings.password_file = storage_dir.join(".pgpass");
        if settings.password_file.exists() {
            settings.password = std::fs::read_to_string(settings.password_file.clone())?;
        }

        let installation_dir = storage_dir.join("pg");
        let data_dir = storage_dir.join("pg_data");
        settings.installation_dir.clone_from(&installation_dir);
        settings.data_dir.clone_from(&data_dir);
        settings.temporary = false;
        settings.version = VersionReq::parse(format!("={}", PG_VERSION).as_str())?;

        info!("Starting PostgreSQL v{}", PG_VERSION);
        let mut postgresql = PostgreSQL::new(settings.clone());
        postgresql.setup().await?;
        postgresql.start().await?;

        if !postgresql.database_exists(DATABASE_NAME).await? {
            info!("Creating database '{}'", DATABASE_NAME);
            postgresql.create_database(DATABASE_NAME).await?;
        }
        let database_url = postgresql.settings().url(DATABASE_NAME);

        let pool = PgPool::connect(database_url.as_str()).await?;

        // For testing: Remove pgvector extension if it exists
        sqlx::query("DROP EXTENSION IF EXISTS vector CASCADE;")
            .execute(&pool)
            .await?;

        let db = Self {
            pool,
            config,
            postgresql,
        };

        db.setup_pg_vectors_extension(&settings).await?;
        db.setup_tables().await?;
        db.setup_indexes().await?;

        Ok(db)
    }

    async fn setup_pg_vectors_extension(&self, settings: &Settings) -> Result<()> {
        info!("Checking if pg_vectors extension is installed");
        if get_installed_extensions(settings).await?.is_empty() {
            info!("Installing pg_vectors extension");

            install_extension(
                settings,
                "portal-corp",
                "pgvector_compiled",
                &VersionReq::parse("=0.16.12")?,
            )
            .await?;
        }

        self.enable_pg_vectors_extension().await?;

        Ok(())
    }

    async fn enable_pg_vectors_extension(&self) -> Result<()> {
        let query = "CREATE EXTENSION IF NOT EXISTS vector;";
        sqlx::query(query).execute(&self.pool).await?;
        Ok(())
    }
// ...
}

Maybe the fix for this would be making the install method take a trait that must implement Send and Sync to force the Rust compiler to accept that the Settings type is threadsafe? Or wrap it in a Box?


#[instrument(level = "debug", skip(settings))]
pub async fn install(
    settings: Box<&dyn Settings + Send + Sync>,
    namespace: &str,
    name: &str,
    version: &VersionReq,
) -> Result<()> {``` 
brianheineman commented 1 month ago

Hello @spikecodes, I am not that familiar with rocket but it looks like for your example you might be able to address the issue by establishing the postgresql embedded instance prior to running rocket, then connect to it using whatever pattern rocket prescribes for connecting to a database. I created an axum example that illustrates this approach #123.

spikecodes commented 1 month ago

Hello @brianheineman, thanks for sharing that Axum example. I will try the Rocket-equivalent to see if that fixes the thread safety issues.