Diggsey / sqlxmq

Message queue implemented on top of PostgreSQL
Apache License 2.0
138 stars 23 forks source link

Tasks as structs #36

Open imbolc opened 2 years ago

imbolc commented 2 years ago

Have you considered representing tasks by structs of their context instead of functions? This way we could avoid explicit context deserialization and related bugs. E.g. this would be impossible:

struct Foo;
struct Bar;

#[job]
pub async fn foo(job: CurrentJob) -> Result<..> {
    let context: Foo = job.json()?;
}
foo.builder().set_json(&Bar).spawn(&db).await?;

It would be something like:

#[derive(Job)]
struct MyJob;

impl CurrentJob for MyJob {
   fn run(self) -> Result<..> {
   }
}

MyJob { ... }.spawn(&db).await?;
Diggsey commented 2 years ago

It could make sense. There are some questions to answer around how this would integrate with the existing system - would it replace job functions? What if you want to use some other serialization than JSON? etc. It's all solvable but needs some work to prototype and get a good interface.

imbolc commented 2 years ago

It seems straightforward with the trait to give an ability to redefine the serialization method:

enum Payload {
   Jsonb(String),
   Bytea(Vec[u8]),
}

trait CurrentJob {
   fn serialize(&self) -> Result<Payload> {
       serde_json::to_string(self).map(Payload::Jsonb)
   }
}

would it replace job functions?

At the first glance it doesn't seem to be less agile comparing to functions so why have both?

Diggsey commented 2 years ago

Hmm, it might make more sense to have a builder() trait method, and then change JobBuilder to use Cow so that JobBuilder can be used as the return type. This would help bridge the gap between the two approaches.

imbolc commented 2 years ago

Do you mean moving decision on serialization method to the job spawning step? I can't imagine a job which would make sense to serialize differently. So I'd rather prefer the serialization to be defined once on the level of the trait implementation (where it could be derived). Otherwise it could lead to different serializations by mistake.

Diggsey commented 2 years ago

Actually the idea was that other fields on the struct could be utilized when spawning the job. eg. you could have a field in the struct which was used as a delay, or as a channel argument. Changing the serialization is also possible though, and useful in those circumstances, since the job payload might be only one part of the struct.

Otherwise it could lead to different serializations by mistake.

So could any approach where the user can control the serialization. For example, in your suggested approach you need a matching deserialize method, and that method needs to use the same type for deserializing.

If you really want to reduce the risk of that, you could do something like this:

trait Job {
    type Payload;
    fn serialize_payload(builder: &mut JobBuilder, payload: Self::Payload) { <default impl> }
    fn deserialize_payload(current_job: &mut CurrentJob) -> Result<Self::Payload> { <default impl> }
    fn builder(self) -> JobBuilder;
}

Where the derive would by default produce an implementation where Self::Payload == Self.

(The idea is that builder() would just call the appropriate serialize function)

imbolc commented 2 years ago

Actually the idea was that other fields on the struct could be utilized when spawning the job

Do you mean there would be some required fields for the struct?

For example, in your suggested approach you need a matching deserialize method, and that method needs to use the same type for deserializing

And we also need to know to what db type map the Payload, maybe an associated const PAYLOAD_DB_TYPE = PayloadDbType::Jsonb;?

FlixCoder commented 2 years ago

This change to structs removes the possibility of custom context arguments though, right? You would provide the context to the struct if I understood that correctly, but that means every spawner needs access to all context required. I think this is a big downside, as message based systems usually benefit from a single runner owning resources and noone having access, right?

imbolc commented 2 years ago

Do you mean JobRegistry::set_context? Can't we pass context as an argument to the job:

trait CurrentJob {
   fn run(self, ctx: Context) -> Result;
}
FlixCoder commented 2 years ago

Yeah that would work, then I misunderstood part of the intent. Of course you would need to extract context manually from the map.

imbolc commented 2 years ago

you would need to extract context manually from the map

Not necessarily, can't we make conext a generic parameter? Then set_context would just replace it:

struct JobRegistry<C> {
    ...
    context: C
}

impl<C> JobRegistry<C> {
    fn set_context(&mut self, context: C) {
        self.context = context;
    }
}

This way in a general case we're statically checked without a need for that extraction from the map. But still if someone wants to sacrifice the type safety for an ability to add fields to the context dynamically they could just make their context a map themselves.

imbolc commented 2 years ago

Actually the idea was that other fields on the struct could be utilized when spawning the job

So with the job context it would look a bit verbose, but I guess it's always the price for the type safety:

use std::sync::Arc;

#[derive(Default)]
struct JobRegistry<C> {
    context: Arc<C>,
}

struct JobContext<C> {
    name: String,
    context: Arc<C>,
}

trait CurrentJob<C> {
    fn run(self, ctx: JobContext<C>);
}

impl<C> JobRegistry<C> {
    fn spawn_job(&self, job: impl CurrentJob<C>) {
        job.run(JobContext {
            name: "foo".into(),
            context: self.context.clone(),
        })
    }
}

#[derive(Default)]
struct MyContext;
struct WrongContext;

struct MyJob;

// Trying to implement it with a wrong context
impl CurrentJob<WrongContext> for MyJob {
    fn run(self, _ctx: JobContext<WrongContext>) {}
}

fn main() {
    let r = JobRegistry::<MyContext>::default();

    // Doesn't compile: the trait `CurrentJob<MyContext>` is not implemented for `MyJob`
    r.spawn_job(MyJob);
}
imbolc commented 2 years ago

Also with structs we could address another pain point of the current implementation. Currently we can JobBuilder::spawn a job, but if we forget to add it to the JobRegistry::new(&[forgotten_job]) we get a runtime error.

If instead the registry would be bound to enum of jobs and the scheduling is done on the enum rather than on a job itself, there won't be a way to schedule a job until you add it to the enum (I removed job context for clarity, here's the full version):

use serde::{de::DeserializeOwned, Deserialize, Serialize};
use std::fmt::Debug;
use std::marker::PhantomData;

struct JobRegistry<J> {
    jobs: PhantomData<J>,
}

trait CurrentJob: Debug + Sized + DeserializeOwned {
    fn run(self) {
        println!("{:?}", self);
    }
}

trait JobList: CurrentJob + Serialize {
    fn schedule(&self) -> String {
        serde_json::to_string(self).unwrap()
    }
}

impl<J> JobRegistry<J>
where
    J: CurrentJob,
{
    fn spawn_from_db(&self, db_data: &str) {
        let job = serde_json::from_str(db_data).unwrap();
        self.spawn_internal(job)
    }

    fn spawn_internal(&self, job: J) {
        job.run()
    }
}

#[derive(Debug, Deserialize, Serialize)]
struct JobFoo;

impl CurrentJob for JobFoo {}

#[derive(Debug, Deserialize, Serialize)]
enum Jobs {
    Foo(JobFoo),
}

// These impls could be probably automated by a macro
impl JobList for Jobs {}
impl CurrentJob for Jobs {
    fn run(self) {
        match self {
            Self::Foo(j) => j.run(),
        }
    }
}

fn main() {
    let r = JobRegistry {
        jobs: PhantomData::<Jobs>,
    };

    // Scheduling is done using job list rather than a particular job,
    // so you can't schedule a job until you add it to the list
    let db_data = Jobs::Foo(JobFoo).schedule();

    r.spawn_from_db(&db_data)
}