pgcentralfoundation / pgrx

Build Postgres Extensions with Rust!
Other
3.54k stars 236 forks source link

Add example of async bgworker #1609

Open AbstractiveNord opened 5 months ago

AbstractiveNord commented 5 months ago

Please add minimal example how to use async runtime, such as Tokio, in bgworker example. I've tried a little, but I have problems with signal handling, which cause postgres to can't stop work.

workingjubilee commented 5 months ago

@AbstractiveNord I have experimented a little with bgworkers but I have never done the thing you describe, so it would have to be contributed.

AbstractiveNord commented 5 months ago

@AbstractiveNord I have experimented a little with bgworkers but I have never done the thing you describe, so it would have to be contributed.

I don't get how to interrupt async infinite loop. My example use block_on method and it's blocks postgres to shutdown. No solution found for now.

workingjubilee commented 5 months ago

@AbstractiveNord It might help if you provided a more complete sample for me to respond to, as I am not unfamiliar with async per se.

AbstractiveNord commented 5 months ago

I've combined pgrx bgworker example and async-nats service example. How I should catch signals inside of async loop? No BackgroundWorker::sigX_recieved() works, no tokio::signal_ctrl_c() works.

#[pg_guard]
#[no_mangle]
pub extern "C" fn background_worker_main(arg: pg_sys::Datum) {
    let arg = unsafe { i32::from_polymorphic_datum(arg, false, pg_sys::INT4OID) };

    BackgroundWorker::attach_signal_handlers(
        SignalWakeFlags::SIGHUP | SignalWakeFlags::SIGTERM | SignalWakeFlags::SIGINT,
    );
    BackgroundWorker::connect_worker_to_spi(Some("postgres"), None);

    log!(
        "Hello from inside the {} BGWorker!  Argument value={}",
        BackgroundWorker::get_name(),
        arg.unwrap()
    );
    let ct = Builder::new_current_thread()
        .enable_all()
        .build()
        .unwrap()
        .block_on(async {
            log!(
                "Hello from inside the {} BGWorker! ",
                BackgroundWorker::get_name()
            );
            let client = async_nats::connect("nats://localhost:4222").await.unwrap();
            let service = client
                .add_service(async_nats::service::Config {
                    name: "pg".to_string(),
                    version: "1.0.0".to_string(),
                    description: None,
                    stats_handler: None,
                    metadata: None,
                    queue_group: None,
                })
                .await
                .unwrap();
            let mut endpoint = service.endpoint("get").await.unwrap();
            loop {
                if BackgroundWorker::sigint_received()
                    || BackgroundWorker::sigterm_received()
                {
                    return;
                };
                tokio::select! {
                    _ = tokio::signal::ctrl_c() => {
                        return;
                    }
                    Some(request) = endpoint.next() => {
                        request.respond(Ok("hello".into())).await.unwrap();
                    }
                }
            }
        });

    log!(
        "Goodbye from inside the {} BGWorker! ",
        BackgroundWorker::get_name()
    );
}
AbstractiveNord commented 5 months ago

I've got a result. The problem was hide around select! macro. In fact, I have to put BackgroundWorker::sigX_received inside of select! macro to make example works as expected. Issue stays active because async example looks really interested to present in crate examples folder.