sunng87 / pgwire

PostgreSQL wire protocol implemented as a rust library.
Apache License 2.0
511 stars 36 forks source link

Migrating Custom `StartupHandler` from 0.22.0 to 0.24.2 #205

Open Lilit0x opened 6 days ago

Lilit0x commented 6 days ago

I'm constructing a Postgres proxy that simulates an actual Postgres server. For this, I've created a custom StartupHandler and AuthSource using pgwire v0.22.0. The implementation is as follows:

pub struct UdiPgpAuthSource {
    config_tx: mpsc::Sender<Message>,
}

impl UdiPgpAuthSource {
    pub fn new(tx: mpsc::Sender<Message>) -> Self {
        Self { config_tx: tx }
    }

    async fn read_config(&self) -> UdiPgpResult<UdiPgpConfig> {
        // this is where I use the `config_tx`
        ...
    }
}

#[async_trait]
impl AuthSource for UdiPgpAuthSource {
    async fn get_password(&self, login: &LoginInfo) -> PgWireResult<Password> {
        let (supplier_id, _) = UdiPgpProcessor::extract_supplier_and_database(login.database())?;

        ....
    }
}
#[derive(Debug, Clone, new)]
pub struct UdiPgpStartupHandler<A, P> {
    auth_source: A,
    parameter_provider: P,
    config_tx: mpsc::Sender<Message>,
}

impl<V: AuthSource, P: ServerParameterProvider> UdiPgpStartupHandler<V, P> {
    async fn read_config(&self) -> UdiPgpResult<UdiPgpConfig> {
        ...
    }
}

#[async_trait]
impl<V: AuthSource, P: ServerParameterProvider> StartupHandler for UdiPgpStartupHandler<V, P> {
    async fn on_startup<C>(
        &self,
        client: &mut C,
        message: PgWireFrontendMessage,
    ) -> PgWireResult<()>
    where
        C: ClientInfo + Sink<PgWireBackendMessage> + Unpin + Send,
        C::Error: Debug,
        PgWireError: From<<C as Sink<PgWireBackendMessage>>::Error>,
    {
        info!("Initializing udi-pgp...");
        ....
    }
    Ok(())
}

When upgrading to pgwire v0.24.2, the challenge arises in delivering the config_tx to the StartupHandler through the new PgWireHandlerFactory.

pub struct UdiPgpProcessorFactory {
    handler: Arc<UdiPgpProcessor>,
}

impl PgWireHandlerFactory for UdiPgpProcessorFactory {
    type StartupHandler = UdiPgpStartupHandler<UdiPgpAuthSource, UdiPgpParameters>;
    type SimpleQueryHandler = UdiPgpProcessor;
    type ExtendedQueryHandler = UdiPgpProcessor;
    type CopyHandler = NoopCopyHandler;

    fn simple_query_handler(&self) -> Arc<Self::SimpleQueryHandler> {
        self.handler.clone()
    }

    fn extended_query_handler(&self) -> Arc<Self::ExtendedQueryHandler> {
        self.handler.clone()
    }

   // no way to get the tx to the custom handler
    fn startup_handler(&self) -> Arc<Self::StartupHandler> {
        let mut parameters = UdiPgpParameters::new();

        Arc::new(UdiPgpStartupHandler::new(
            UdiPgpAuthSource::new(tx.clone()),
            UdiPgpParameters::new(),
            tx.clone(),
        ))
    }

    fn copy_handler(&self) -> Arc<Self::CopyHandler> {
        Arc::new(NoopCopyHandler)
    }
}

Do you by chance have a suggestion on how to do this or support passing custom parameters to the PgWireHandlerFactory trait? Please let me know if you need any further details. Thanks.

sunng87 commented 4 days ago

Hello @Lilit0x , may I know if your startup handler is a stateful one or stateless, in other words, do you need to create new instance of the handlers for each incoming connection?

I have to say my current naming of the trait, PgWireHandlerFactory might be confusing because the methods in trait may not allow you to customize the handler based on incoming connection. So in most case, this factory is only a cache for different set of handlers. If you handlers are stateful, you will need to build these handlers before calling process_socket and cache it in a struct that implements PgWireHandlerFactory. And if it's stateless, you can simply cache them in a global struct or just create it when those handler methods called.

I may rename this PgWireHandlerFactory to PgWireHandlers in next patch release hoping make it a little bit clear.