sfackler / rust-postgres

Native PostgreSQL driver for the Rust programming language
Apache License 2.0
3.43k stars 436 forks source link

Question: how do I retrieve all `RAISE NOTICE` calls made during a particular query in tokio-postgres #1048

Closed stevenliebregt closed 1 year ago

stevenliebregt commented 1 year ago

Hi, I'm pretty new to async rust and tokio-postgres, and am trying to implement a wrapper around tokio-postgres.

I have a struct called Repo which stores the current client, and during initialization I throw the connection to a tokio::spawn call so the queries actually get executed.

pub struct Repo {
    pub(crate) postgres_client: tokio_postgres::Client,
    ...
}

impl Repo {
    pub async fn from_config(
        config: tokio_postgres::Config,
    ) -> Result<Self, Error> {
        let (postgres_client, mut postgres_connection) =  config.connect(tokio_postgres::NoTls).await?;

        tokio::spawn(postgres_connection);

        Ok(Self {
            postgres_client,
        })
    }
}

Now I have a method on the Repo that I call to execute a query, and in there I would like to fetch all RAISE NOTICE messages that occurred during the execution of that query.

impl Repo {
    pub fn my_query_fn(&self) {
        self.postgres_client.query("COOL QUERY THAT RAISES NOTICES SOMEWHERE", &[]).unwrap();

        // Now here I would like to do something like
        let notices = self.postgres_client.notices(); // Which consumes all notices on some global stack or something.
    }
}

Is there any support for this?

I see that the non-async variant has a notice_callback option in the config.

sfackler commented 1 year ago

The connection returns a stream of events from the server. Rather than just spawning it off as your code does above, you can grab those messages and provide them to your other code with e.g. a channel. Here's a test that does that: https://github.com/sfackler/rust-postgres/blob/master/tokio-postgres/tests/test/main.rs#L696

stevenliebregt commented 1 year ago

If I have other functions that do not need to listen to the notices, will the query then not be executed since the connection is not spawned off somewhere?

sfackler commented 1 year ago

You do spawn off the connection: https://github.com/sfackler/rust-postgres/blob/master/tokio-postgres/tests/test/main.rs#L707

stevenliebregt commented 1 year ago

I think I don't fully understand, my struct now looks like this:

struct Repo {
    pub(crate) postgres_client: tokio_postgres::Client,
    // This thing I got from the `mpsc::unbounded()` call
    pub(crate) rx: futures::channel::mpsc::UnboundedReceiver<tokio_postgres::AsyncMessage>
}

In my impl block I'm now trying to get the results out

impl Repo {
    async fn my_query_fn(&self) {
        ...

        self.postgres_client.query("my query", &[]).await.unwrap();

        let notices = &self.rx.filter_map(|m| match m {
            AsyncMessage::Notice(n) => future::ready(Some(n)),
            _ => future::ready(None),
        })
            .collect::<Vec<_>>()
            .await;
    }
}

But that does not work since collect takes ownership of rx and I want this method to be able to be called repeatedly, what am I missing?

sfackler commented 1 year ago

You can use StreamExt::by_ref to avoid taking ownership, but collect is going to wait until the stream ends (i.e. when the connection drops) which you probably don't want. https://docs.rs/futures/latest/futures/prelude/stream/trait.StreamExt.html#method.by_ref

stevenliebregt commented 1 year ago

No I indeed don't want to wait until the stream ends, I guess my structure does not allow me to do what I need to do. I'll have to find a way around the reliance on notices (which is probably a good thing :))