film42 / sidekiq-rs

A port of sidekiq to rust using tokio
MIT License
95 stars 10 forks source link

Proof of concept custom unique hash override #22

Closed film42 closed 5 months ago

film42 commented 1 year ago

This allows you to implement a custom unique hash function on the worker trait, so that when a job is enqueued with the unique_for(duration) option, a custom hash will be used instead of the generic sha256(to_json(args)) implementation.

It's a little clunky at this point, but I wanted to open a proof of concept PR to give time to think about how this should be implemented.

NOTE: While unique_for will work just fine without a worker trait being implemented (for workers implemented in other projects or other languages), this current implementation of uniq_hash_for_args is only defined on the worker trait. Otherwise, you'll need to specify a custom unique hash as you enqueue the worker. Example:

            sidekiq::opts()
                .queue("other-app")
                .unique_for(std::time::Duration::from_secs(60))
                .unique_hash_for_args("CUSTOMER-123".to_string())
                .perform_async(
                    &mut redis,
                    "SomeUniqueWorkerThatLivesSomewhereElse".to_string(),
                    CustomerNotification {
                        customer_guid: "CST-123".to_string(),
                        date_string: date_string.clone(),
                    },
                )
                .await?;

It's nice that this will still work, but it's a little clunky.

Otherwise the impl looks like:

#[derive(Clone)]
struct CustomerSuperUniqueNotificationWorker;

#[async_trait]
impl Worker<CustomerNotification> for CustomerSuperUniqueNotificationWorker {
    fn opts() -> sidekiq::WorkerOpts<CustomerNotification, Self> {
        // Use default options to set the unique_for option by default.
        sidekiq::WorkerOpts::new()
            .queue("customers")
            .unique_for(std::time::Duration::from_secs(30))
    }

    fn unique_hash_for_args(
        args: &CustomerNotification,
    ) -> Result<String, Box<dyn std::error::Error>> {
        Ok(format!("{:x}", Sha256::digest(&args.customer_guid)))
    }

    async fn perform(&self, _args: CustomerNotification) -> Result<(), Box<dyn std::error::Error>> {
        Ok(())
    }
}

Naming isn't superb but it's functional.

film42 commented 5 months ago

Ended up not loving this, so closing.