ayrat555 / fang

Background processing for Rust
https://fang.badykov.com/
MIT License
609 stars 25 forks source link

How to process/retrieve the result of a task and how to pass a non serializable context to the task? #147

Open aurelien-clu opened 9 months ago

aurelien-clu commented 9 months ago

Hello,

Thank you for your work. :)

I am wondering the following things:

(Feel free to tell me if you would like me to split this into multiple issues.)

E.g. I would like to store the result inside the database or transmit the result to another service (in-memory) without doing an API call to localhost if I am running the workers within an API. And I don't want to build the client/db pool for every task from environment variables.

async fn run(&self, queue: &mut dyn AsyncQueueable) -> Result<(), FangError>

run signature is Result<(), FangError> thus I could not make my own AsyncWorker<AQueue> or AsyncWorkerPool<AQueue> that would handle the result of a task.

And since a task should be serializable, I cannot provide to it clients (database or otherwise) that I would not want serializable but that I would want to access in the run function.

I am thinking that the only way would be to attempt to access a global reference to a singleton or something alike, e.g.:

async fn run(&self, _queue: &mut dyn AsyncQueueable) -> Result<(), FangError> {
    // do stuff
    let result = ...;
    let pool: Pool<Postgres> = get_global_postgres_pool().expect("Posgres pool is not initialized");
    pool.acquire().await?.execute("<INSERT result into table>").await?;
    Ok(())
}

Though I am not sure that the compiler would allow me to do so.

  1. Did I miss something?
  2. Or do you currently use a workaround to achieve this behavior?
  3. Or would that kind of feature be out of scope of fang and you don't need this?

Maybe it is part or could be part of the following discussion https://github.com/ayrat555/fang/discussions/101 ?

I understand that this could/would considerably complicate the implementation, but no harm in asking 😇.

aurelien-clu commented 9 months ago

Seems doable to have a global pool, e.g. https://stackoverflow.com/a/63153898

Still interested in your take on this ;)

PainOchoco commented 6 months ago

Hello, Have you figured out how to retrieve the result of a task? Thanks

PainOchoco commented 6 months ago

More specifically I would want the task to output a stream of data

PainOchoco commented 6 months ago

It would be nice if we could pass a MPSC sender instance through the task's run method. It would allow data streaming between the consumer and the senders (workers).

Let me know if there's already a way to do that, I can't find a way to pass a sender instance to the tasks.

pxp9 commented 6 months ago

There is no current way to pass non-serializable object to a task.

One work-around you can do is to make the Task fields to be database pool, or connetion url which are both serializable and the work will store in the database the stuff you want. After that, do another task that will fetch from database every cetain amount of time.

The main problem that this

async fn run<T>(&self, queue: &mut dyn AsyncQueueable) -> Result<T, FangError>

can not be implemented yet, I think (I am not sure because Rust has been changing since i have implemented it).

The reason is that async traits do not allow generics, at least at the moment we have done the implementation.

pxp9 commented 5 months ago

I have been reasearching about this and I think since Rust 1.75 this change could be possible.

As soon as Sqlite PR and Mysql closes #141 , I will open a experimental PR exploring this.

sealter commented 5 months ago

Seems doable to have a global pool, e.g. https://stackoverflow.com/a/63153898

Still interested in your take on this ;)

not a good choice in global.