sfackler / rust-postgres

Native PostgreSQL driver for the Rust programming language
Apache License 2.0
3.42k stars 436 forks source link

Listen for PSQL notification using tokio_postgres? #1098

Closed NielDuysters closed 7 months ago

NielDuysters commented 7 months ago

PostgreSQL has a feature NOTIFY/LISTEN which basically allows you to send a notification when a specific event happens in your database like for example a new row insert.

Now I want to be able in Rust using Tokio postgres to listen for a notification and output it. Tokio does seem to support it.

I tried multiple approaches.

FIRST APPROACH I tried the following example but it yields an error.

//#![feature(poll_map)] -> Gives error - #![feature] may not be used on the stable release channel
use futures::{stream, StreamExt};
use futures::{FutureExt, TryStreamExt};
use std::env;
use tokio::sync::mpsc;
use tokio_postgres::{connect, NoTls};

#[tokio::main]
async fn main() {
    let connection_parameters = env::var("DBURL").unwrap();
    let (client, mut conn) = connect(&connection_parameters, NoTls).await.unwrap();

    let (tx, mut rx) = mpsc::unbounded_channel();
    let stream = stream::poll_fn(move |cx| conn.poll_message(cx).map_err(|e| panic!(e)));
    let c = stream.forward(tx).map(|r| r.unwrap());
    tokio::spawn(c);
    println!("After spawn listener");

    client.batch_execute("LISTEN test_notifications;").await.unwrap();
    loop {
        let m = rx.recv().await;
        println!("GOT MESSAGE");
    }        
}
error[E0277]: the trait bound `tokio::sync::mpsc::UnboundedSender<_>: futures::Sink<AsyncMessage>` is not satisfied
    --> src/algotester.rs:15:28
     |
15   |     let c = stream.forward(tx).map(|r| r.unwrap());
     |                    ------- ^^ the trait `futures::Sink<AsyncMessage>` is not implemented for `tokio::sync::mpsc::UnboundedSender<_>`
     |                    |
     |                    required by a bound introduced by this call
     |
     = help: the following other types implement trait `futures::Sink<Item>`:
               <futures::futures_channel::mpsc::Sender<T> as futures::Sink<T>>
               <futures::futures_channel::mpsc::UnboundedSender<T> as futures::Sink<T>>
               <Box<S> as futures::Sink<Item>>
               <tokio_postgres::connect_raw::StartupStream<S, T> as futures::Sink<tokio_postgres::codec::FrontendMessage>>
               <tokio_util::io::stream_reader::StreamReader<S, E> as futures::Sink<T>>
               <tokio_util::sync::mpsc::PollSender<T> as futures::Sink<T>>
               <tokio_util::io::copy_to_bytes::CopyToBytes<S> as futures::Sink<&'a [u8]>>
               <tokio_util::codec::framed_write::FramedWrite<T, E> as futures::Sink<I>>
             and 66 others
note: required by a bound in `futures::StreamExt::forward`
    --> /Users/name/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-util-0.3.29/src/stream/stream/mod.rs:1560:12
     |
1558 |     fn forward<S>(self, sink: S) -> Forward<Self, S>
     |        ------- required by a bound in this associated function
1559 |     where
1560 |         S: Sink<Self::Ok, Error = Self::Error>,
     |            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ required by this bound in `StreamExt::forward`

error[E0599]: the method `map` exists for struct `Forward<PollFn<...>, ...>`, but its trait bounds were not satisfied
  --> src/algotester.rs:15:32
   |
15 |       let c = stream.forward(tx).map(|r| r.unwrap());
   |                                  ^^^ method cannot be called on `Forward<PollFn<...>, ...>` due to unsatisfied trait bounds
   |
  ::: /Users/name/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-util-0.3.29/src/stream/stream/mod.rs:89:1
   |
89 | / delegate_all!(
90 | |     /// Future for the [`forward`](super::StreamExt::forwa...
91 | |     #[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
92 | |     Forward<St, Si>(
...  |
95 | |     where St: TryStream
96 | | );
   | | -
   | | |
   | | doesn't satisfy `_: FutureExt`
   | | doesn't satisfy `_: Future`
   | | doesn't satisfy `_: Iterator`
   | |_doesn't satisfy `_: StreamExt`
   |   doesn't satisfy `_: Stream`
   |
   = note: the full type name has been written to '/Users/name/Sites/keyrock/rust-api/target/debug/deps/algotester-585fdee191a40d3c.long-type-2506645915138413193.txt'
   = note: the following trait bounds were not satisfied:
           `Forward<futures::stream::PollFn<{closure@src/algotester.rs:14:34: 14:43}>, tokio::sync::mpsc::UnboundedSender<_>>: Stream`
           which is required by `Forward<futures::stream::PollFn<{closure@src/algotester.rs:14:34: 14:43}>, tokio::sync::mpsc::UnboundedSender<_>>: StreamExt`
           `Forward<futures::stream::PollFn<{closure@src/algotester.rs:14:34: 14:43}>, tokio::sync::mpsc::UnboundedSender<_>>: futures::Future`
           which is required by `Forward<futures::stream::PollFn<{closure@src/algotester.rs:14:34: 14:43}>, tokio::sync::mpsc::UnboundedSender<_>>: FutureExt`
           `&Forward<futures::stream::PollFn<{closure@src/algotester.rs:14:34: 14:43}>, tokio::sync::mpsc::UnboundedSender<_>>: Stream`
           which is required by `&Forward<futures::stream::PollFn<{closure@src/algotester.rs:14:34: 14:43}>, tokio::sync::mpsc::UnboundedSender<_>>: StreamExt`
           `&Forward<futures::stream::PollFn<{closure@src/algotester.rs:14:34: 14:43}>, tokio::sync::mpsc::UnboundedSender<_>>: futures::Future`
           which is required by `&Forward<futures::stream::PollFn<{closure@src/algotester.rs:14:34: 14:43}>, tokio::sync::mpsc::UnboundedSender<_>>: FutureExt`
           `&mut Forward<futures::stream::PollFn<{closure@src/algotester.rs:14:34: 14:43}>, tokio::sync::mpsc::UnboundedSender<_>>: Stream`
           which is required by `&mut Forward<futures::stream::PollFn<{closure@src/algotester.rs:14:34: 14:43}>, tokio::sync::mpsc::UnboundedSender<_>>: StreamExt`
           `&mut Forward<futures::stream::PollFn<{closure@src/algotester.rs:14:34: 14:43}>, tokio::sync::mpsc::UnboundedSender<_>>: futures::Future`
           which is required by `&mut Forward<futures::stream::PollFn<{closure@src/algotester.rs:14:34: 14:43}>, tokio::sync::mpsc::UnboundedSender<_>>: FutureExt`
           `Forward<futures::stream::PollFn<{closure@src/algotester.rs:14:34: 14:43}>, tokio::sync::mpsc::UnboundedSender<_>>: Iterator`
           which is required by `&mut Forward<futures::stream::PollFn<{closure@src/algotester.rs:14:34: 14:43}>, tokio::sync::mpsc::UnboundedSender<_>>: Iterator`

warning: unused import: `TryStreamExt`
 --> src/algotester.rs:3:26
  |
3 | use futures::{FutureExt, TryStreamExt};
  |                          ^^^^^^^^^^^^
  |
  = note: `#[warn(unused_imports)]` on by default

warning: unused import: `FutureExt`
 --> src/algotester.rs:3:15
  |
3 | use futures::{FutureExt, TryStreamExt};
  |               ^^^^^^^^^

Some errors have detailed explanations: E0277, E0599.
For more information about an error, try `rustc --explain E0277`.
warning: `rust-api` (bin "algotester") generated 2 warnings

SECOND APPROACH Comes from this example.

//#![feature(poll_map)] -> Gives error - #![feature] may not be used on the stable release channel
use futures::{stream, StreamExt};
use futures::{FutureExt, TryStreamExt};
use std::env;
use tokio::sync::mpsc;
use tokio_postgres::{connect, NoTls};

#[tokio::main]
async fn main() {
    let connection_parameters = env::var("DBURL").unwrap();
    let (client, mut conn) = connect(&connection_parameters, NoTls).await.unwrap();

    // conn.execute() not found so adapting example.
    //conn.execute("LISTEN myevent", &[]).expect("Could not send LISTEN");
    client.query("LISTEN myevent", &[]).await.expect("Could not send LISTEN");

    let notifications = conn.notifications();
    let mut it = notifications.blocking_iter();

    println!("Waiting for notifications...");
    loop {
        let a = it.next();
        match a {
            Ok(Some(b)) => {
                println!("{:?}", b);
            },
            Err(e) => println!("Got error {:?}", e),
            _ => panic!("Unexpected operation!!!")

        }

    }
}
warning: unused import: `stream`
 --> src/algotester.rs:2:15
  |
2 | use futures::{stream, StreamExt};
  |               ^^^^^^
  |
  = note: `#[warn(unused_imports)]` on by default

warning: unused imports: `FutureExt`, `TryStreamExt`
 --> src/algotester.rs:3:15
  |
3 | use futures::{FutureExt, TryStreamExt};
  |               ^^^^^^^^^  ^^^^^^^^^^^^

warning: unused import: `tokio::sync::mpsc`
 --> src/algotester.rs:5:5
  |
5 | use tokio::sync::mpsc;
  |     ^^^^^^^^^^^^^^^^^

error[E0599]: no method named `notifications` found for struct `Connection` in the current scope
  --> src/algotester.rs:14:30
   |
14 |     let notifications = conn.notifications();
   |                              ^^^^^^^^^^^^^ method not found in `Connection<Socket, NoTlsStream>`

How to listen for a Psql notification in Rust?

Crossposted: https://users.rust-lang.org/t/listen-for-psql-notification-using-tokio-postgres/105798

sfackler commented 7 months ago

https://github.com/sfackler/rust-postgres/blob/master/tokio-postgres/tests/test/main.rs#L736-L769

NielDuysters commented 7 months ago

My actual code is a bit more complex using async and different threads:

use futures::{stream, StreamExt};
use futures::{FutureExt, TryStreamExt};
use std::env;
use tokio_postgres::{connect, NoTls};
use std::sync::Arc;
use tokio::sync::Mutex;

#[tokio::main]
async fn main() {
    // PostgreSQL connection.
    let (client, mut connection) = tokio_postgres::connect("host=localhost user=postgres dbname=", NoTls).await.unwrap();

    tokio::spawn(async move {
        if let Err(e) = connection.await {
            eprintln!("connection error: {}", e);
        }
    });

    let (tx, rx) = futures_channel::mpsc::unbounded();
    let stream =
        stream::poll_fn(move |cx| connection.poll_message(cx)).map_err(|e| panic!("{}", e));
    let connection = stream.forward(tx).map(|r| r.unwrap());
    tokio::spawn(connection);

    client
        .batch_execute(
            "LISTEN test_notifications;
             NOTIFY test_notifications, 'hello';
             NOTIFY test_notifications, 'world';",
        )
        .await
        .unwrap();

    drop(client);

    let notifications = rx
        .filter_map(|m| match m {
            tokio_postgres::AsyncMessage::Notification(n) => futures_util::future::ready(Some(n)),
            _ => futures_util::future::ready(None),
        })
        .collect::<Vec<_>>()
        .await;
}

Error:

error[E0382]: use of moved value: `connection`
  --> src/algotester.rs:24:25
   |
12 |       let (client, mut connection) = tokio_postgres::con...
   |                    -------------- move occurs because `connection` has type `Connection<Socket, NoTlsStream>`, which does not implement the `Copy` trait
...
15 |       tokio::spawn(async move {
   |  __________________-
16 | |         if let Err(e) = connection.await {
   | |                         ---------- variable moved due to use in generator
17 | |             eprintln!("connection error: {}", e);
18 | |         }
19 | |     });
   | |_____- value moved here
...
24 |           stream::poll_fn(move |cx| connection.poll_mess...
   |                           ^^^^^^^^^ ---------- use occurs due to use in closure
   |                           |
   |                           value used here after move

I tried using Arc and Mutex but when I use tokio's Mutex I can't .await the lock because stream::poll_fn is sync.

sfackler commented 7 months ago

Look at the example I linked. You need to remove


    tokio::spawn(async move {
        if let Err(e) = connection.await {
            eprintln!("connection error: {}", e);
        }
    });
NielDuysters commented 7 months ago

The problem is that if I don't await the connection my other code won't work (it compiles but the queries won't run) because I use that connection to query a lot of sql simultaneously and asynchronous: https://users.rust-lang.org/t/listen-for-psql-notification-using-tokio-postgres/105798/8

I think the best approach would be to create a new instance of the connection for the one function I need to listen for notifications. Is it possible to do that? Could I retrieve the connection parameters from the connection I am awaiting and create a new connection using those parameters?

sfackler commented 7 months ago

connection.poll_message also drives the connection. You do need to ensure that you are consuming the messages it writes to the channel though.

NielDuysters commented 7 months ago

Could you elaborate a bit on what you mean exactly?

sfackler commented 7 months ago

Can you show a self contained example of a some code where the queries don't run?

NielDuysters commented 7 months ago
use futures::{stream, StreamExt};
use futures::{FutureExt, TryStreamExt};
use std::env;
use tokio_postgres::{connect, NoTls};
use std::sync::Arc;
use tokio::sync::Mutex;

#[tokio::main]
async fn main() {
    // PostgreSQL connection.
    let (client, mut connection) = tokio_postgres::connect("host=localhost user=postgres dbname=", NoTls).await.unwrap();

    let (tx, rx) = futures_channel::mpsc::unbounded();
    let stream =
        stream::poll_fn(move |cx| connection.poll_message(cx)).map_err(|e| panic!("{}", e));
    let connection = stream.forward(tx).map(|r| r.unwrap());
    tokio::spawn(connection);

    client
        .batch_execute(
            "LISTEN test_notifications;
             NOTIFY test_notifications, 'hello';
             NOTIFY test_notifications, 'world';",
        )
        .await
        .unwrap();

    let notifications = rx
        .filter_map(|m| match m {
            tokio_postgres::AsyncMessage::Notification(n) => futures_util::future::ready(Some(n)),
            _ => futures_util::future::ready(None),
        })
        .collect::<Vec<_>>()
        .await;

    // REST OF CODE
    let query =
        client
        .query("
                SELECT btc FROM history LIMIT 1 
            ", &[]).await;

    match query {
        Ok(q) => {
            let r = q[0].get::<_, &str>("btc");
            println!("r {}", r);
        },
        Err(e) => {
            return;
        }
    }
}

From my question here: https://users.rust-lang.org/t/listen-for-psql-notification-using-tokio-postgres/105798/6?u=oniel

sfackler commented 7 months ago

Your .collect().await call is only going to terminate when the connection dies. The function never progresses to the SELECT btc portion.

NielDuysters commented 7 months ago

Oh okay, so that is the part that stalls my code? Maybe I just need to run

let notifications = rx
        .filter_map(|m| match m {
            tokio_postgres::AsyncMessage::Notification(n) => futures_util::future::ready(Some(n)),
            _ => futures_util::future::ready(None),
        })
        .collect::<Vec<_>>()
        .await;

on a different thread using tokio::spawn?

sfackler commented 7 months ago

What you do with the notifications is up to you.

NielDuysters commented 7 months ago

I think I solved it thanks to your help:

use futures::{stream, StreamExt};
use futures::{FutureExt, TryStreamExt};
use std::env;
use tokio_postgres::{connect, NoTls};
use std::sync::Arc;
use tokio::sync::Mutex;

#[tokio::main]
async fn main() {
    // PostgreSQL connection.
    let (client, mut connection) = tokio_postgres::connect("host=localhost user=postgres dbname=", NoTls).await.unwrap();

    // Make transmitter and receiver.
    let (tx, rx) = futures_channel::mpsc::unbounded();
    let stream =
        stream::poll_fn(move |cx| connection.poll_message(cx)).map_err(|e| panic!("{}", e));
    let connection = stream.forward(tx).map(|r| r.unwrap());
    tokio::spawn(connection);

    // Wait for notifications in seperate thread.
    tokio::spawn(async move {
    let notifications = rx
        .filter_map(|m| match m {
            tokio_postgres::AsyncMessage::Notification(n) => {
                println!("Notification {:?}", n);
                futures_util::future::ready(Some(n))
            },
            _ => futures_util::future::ready(None),
        })
        .collect::<Vec<_>>().await;

        // All notifications?
        println!("All notifications {:?}", notifications);
    });

    // Execute listen/notify
    match client
        .batch_execute(
            "LISTEN test_notifications;
             NOTIFY test_notifications, 'hello';
             NOTIFY test_notifications, 'world';",
        )
        .await
        {
            Ok(_) => (),
            Err(e) => {
                eprintln!("Error {}", e);
            }
        }

    // Execute random query.
    let query =
        client
        .query("
                SELECT order_id FROM history LIMIT 1 
            ", &[]).await;

    match query {
        Ok(q) => {
            let r = q[0].get::<_, &str>("order_id");
            println!("r {}", r);
        },
        Err(e) => {
            return;
        }
    }
}

I am now able to receive notifications while also querying my database.