postgresml / pgcat

PostgreSQL pooler with sharding, load balancing and failover support.
MIT License
2.96k stars 177 forks source link

The plpgsql funtion write data to some table need force route to primary database #485

Open mingjunyang opened 1 year ago

mingjunyang commented 1 year ago

Is your feature request related to a problem? Please describe. My plpgsql function need insert data to table, but I think current pgcat did not know my plpgsql function need only route to primary database.

CREATE TABLE write_test_table (
    ID bigserial PRIMARY KEY NOT NULL,
    info1 int,
    info2 int
);

CREATE OR REPLACE FUNCTION test_write_test_table ()
    RETURNS void
    AS $$
DECLARE
    t_count int;
BEGIN
    t_count := 0;
    LOOP
        RAISE NOTICE '%', t_count;
        t_count := t_count + 1;
        INSERT INTO write_test_table (info1, info2)
            VALUES (t_count, t_count * t_count);
        EXIT
        WHEN t_count = 10000;
    END LOOP;
END;
$$
LANGUAGE plpgsql;

my pgcat.toml

[general]
host = "0.0.0.0"
port = 6432

# Number of worker threads the Runtime will use (4 by default).
worker_threads = 8
prepared_statements = true
prepared_statements_cache_size = 500

admin_username = "postgres"
admin_password = "postgres"

[plugins]

[plugins.query_logger]
enabled = true

[pools.demo_db]
pool_mode = "transaction"
default_role = "primary"
query_parser_enabled = true
primary_reads_enabled = true
sharding_function = "pg_bigint_hash"

shards.0 = { servers = [["10.1.1.1", 5432, "primary"],["10.1.1.2", 5432, "replica"]], database = "demo_db"}
users.0 = { username = "postgres", password = "postgres", pool_size = 50 }

Describe the solution you'd like Should we create a plugin , detect the query and force some appointed query route to primary database. like those

[plugins.force_to_primary]
enabled = true
querys = [
  "select test_write_test_table()",
  "select test_write_test_table_xxx($1,$2)",
  regex".*\s+test_write_test_table\s+.*",
]

Or, have any implement the same function? Or, I can set some config to got this function?

levkk commented 1 year ago

That's an option. Another is to tell PgCat to route the query to the primary:

SET SERVER ROLE TO 'primary';

and then call your function. Once you're done, run:

SET SERVER ROLE TO 'auto';
mingjunyang commented 1 year ago

change query with SET SERVER ROLE TO 'primary'; , this need change my client code, it's some difficulties.

Maybe I can implement this code. But I did not know how deal with PluginOutput::OverwriteRouter and

match plugin_result {
    Ok(PluginOutput::Deny(error)) => {
        error_response(&mut self.write, &error).await?;
        continue;
    }

    Ok(PluginOutput::Intercept(result)) => {
        write_all(&mut self.write, result).await?;
        continue;
    }
    Ok(PluginOutput::OverwriteRouter(result)) => {
        query_router.update_role(result);
        continue;
    }

    _ => (),
};
diff --git a/src/client.rs b/src/client.rs
index 6c0d06f..4ab6d71 100644
--- a/src/client.rs
+++ b/src/client.rs
@@ -854,6 +854,10 @@ where
                                     write_all(&mut self.write, result).await?;
                                     continue;
                                 }
+                                Ok(PluginOutput::OverwriteRouter(result)) => {
+                                    query_router.update_role(result);
+                                    continue;
+                                }

                                 _ => (),
                             };
@@ -1371,6 +1375,10 @@ where
                                 continue;
                             }

+                            Some(PluginOutput::OverwriteRouter(role)) => {
+                                query_router.update_role(role);
+                            }
+
                             _ => (),
                         };

diff --git a/src/config.rs b/src/config.rs
index a2314fc..78470ec 100644
--- a/src/config.rs
+++ b/src/config.rs
@@ -754,6 +754,7 @@ pub struct Plugins {
     pub table_access: Option<TableAccess>,
     pub query_logger: Option<QueryLogger>,
     pub prewarmer: Option<Prewarmer>,
+    pub route_query_to_primary : Option<RouteQueryToPrimary>,
 }

 impl std::fmt::Display for Plugins {
@@ -792,6 +793,13 @@ pub struct Prewarmer {
     pub queries: Vec<String>,
 }

+#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Default, Hash, Eq)]
+pub struct RouteQueryToPrimary {
+    pub enabled: bool,
+    pub force:bool,
+    pub qeurys: String,
+}
+
 impl Intercept {
     pub fn substitute(&mut self, db: &str, user: &str) {
         for (_, query) in self.queries.iter_mut() {
diff --git a/src/plugins/mod.rs b/src/plugins/mod.rs
index 5ef6009..f6d3125 100644
--- a/src/plugins/mod.rs
+++ b/src/plugins/mod.rs
@@ -12,8 +12,9 @@ pub mod intercept;
 pub mod prewarmer;
 pub mod query_logger;
 pub mod table_access;
+pub mod route_query_to_primary;

-use crate::{errors::Error, query_router::QueryRouter};
+use crate::{errors::Error, query_router::QueryRouter, config::Role};
 use async_trait::async_trait;
 use bytes::BytesMut;
 use sqlparser::ast::Statement;
@@ -21,6 +22,7 @@ use sqlparser::ast::Statement;
 pub use intercept::Intercept;
 pub use query_logger::QueryLogger;
 pub use table_access::TableAccess;
+pub use route_query_to_primary::RouteQueryToPrimary;

 #[derive(Clone, Debug, PartialEq)]
 pub enum PluginOutput {
@@ -28,6 +30,7 @@ pub enum PluginOutput {
     Deny(String),
     Overwrite(Vec<Statement>),
     Intercept(BytesMut),
+    OverwriteRouter(Role)
 }

 #[async_trait]
diff --git a/src/query_router.rs b/src/query_router.rs
index 126b813..a40ae22 100644
--- a/src/query_router.rs
+++ b/src/query_router.rs
@@ -136,6 +136,11 @@ impl QueryRouter {
         &self.pool_settings
     }

+    pub fn update_role<'a>(&'a mut self,role: Role) -> bool {
+        self.active_role=Some(role);
+        true
+    }
+
     /// Try to parse a command and execute it.
     pub fn try_execute_command(&mut self, message_buffer: &BytesMut) -> Option<(Command, String)> {
         let mut message_cursor = Cursor::new(message_buffer);

src/plugins/route_query_to_primary.rs

use async_trait::async_trait;
use regex::Regex;
use sqlparser::ast::{visit_expressions, Statement};

use crate::{
    errors::Error,
    plugins::{Plugin, PluginOutput},
    query_router::QueryRouter,
};

use log::info;

use core::ops::ControlFlow;

pub struct RouteQueryToPrimary<'a> {
    pub enabled: bool,
    pub force: bool,
    pub regex: &'a Regex,
}

#[async_trait]
impl<'a> Plugin for RouteQueryToPrimary<'a> {
    async fn run(
        &mut self,
        _query_router: &QueryRouter,
        ast: &Vec<Statement>,
    ) -> Result<PluginOutput, Error> {
        if !self.enabled {
            return Ok(PluginOutput::Allow);
        }

        let mut force_route = false;

        visit_expressions(ast, |express| {
            let expr = express.to_string();
            if self.regex.is_match(&expr) {
                force_route = true;
            }
            ControlFlow::<()>::Continue(())
        });

        if force_route && self.force {
            info!("force route query to primary: {}", force_route);
            Ok(PluginOutput::OverwriteRouter(crate::config::Role::Primary))
        } else {
            Ok(PluginOutput::Allow)
        }
    }
}

this code only a simply conceive, and not test running . I will try later.