film42 / sidekiq-rs

A port of sidekiq to rust using tokio
MIT License
108 stars 11 forks source link

Ruby and rust examples together #11

Closed film42 closed 8 months ago

film42 commented 2 years ago

This adds two projects "rustic" and "rubies".

The rustic project enqueues jobs for the rubies project to consume using the "yolo_app" namespace.

the rubies project enqueues jobs for the rustic project to consume and it's using the same "yolo_app" namespace.

Both seem to be working.

@justmark would you mind taking a look at this vs your code? I can get them to talk to one another just fine.

justmark commented 2 years ago

I'm running an older version of Sidekiq. I'm upgrading that now (and it broke things in the process). Will get to this once I have that all sorted out.

justmark commented 2 years ago

Ok, back in service. Checking this out now.

justmark commented 2 years ago

Your examples worked perfectly. I'll see if I can figure out where I messed up. Apologies for wasting your time.

film42 commented 2 years ago

Not a waste at all! It helped me catch a bug! https://github.com/film42/sidekiq-rs/pull/12

justmark commented 2 years ago

Actually ... I spoke too soon.

When I run the Rust side of your app, I'm only getting the jobs that were created by Ruby. The Rust ones aren't getting processed. This (I think) is the issue I'm experiencing. I am not getting the temp.house.basement|garage. I get the others... Here is my output:

Jul 16 17:29:29.985 INFO sidekiq, jid: eb58bfca6f3e75cd44272d73, queue: rust:v1_statistics, class: V1StatisticsWorker, status: start
Jul 16 17:29:29.985 INFO sidekiq, jid: 13e11fcf09706dfcb5659db1, queue: rust:v2_statistics, class: V2::StatisticsWorker, status: start
Jul 16 17:29:29.985 INFO Got a metric (v1), metric: Stats { metric: "temp.outside", value: 10.01 }
Jul 16 17:29:29.985 INFO sidekiq, jid: c03f00f798e791277432dfeb, queue: rust:v1_statistics, class: V1StatisticsWorker, status: start
Jul 16 17:29:29.985 INFO Got a metric (v2), metric: Stats { metric: "temp.inside", value: 20.02 }
Jul 16 17:29:29.985 INFO sidekiq, jid: eb58bfca6f3e75cd44272d73, queue: rust:v1_statistics, class: V1StatisticsWorker, status: done, elapsed: 209.259µs
Jul 16 17:29:29.985 INFO sidekiq, jid: 1863e9ab0ac5ce63ab4bceaf, queue: rust:v2_statistics, class: V2::StatisticsWorker, status: start
Jul 16 17:29:29.985 INFO sidekiq, jid: 13e11fcf09706dfcb5659db1, queue: rust:v2_statistics, class: V2::StatisticsWorker, status: done, elapsed: 181.059µs
Jul 16 17:29:29.985 INFO sidekiq, jid: 2c76489f7a75381eebc1f9d7, queue: rust:v2_statistics, class: V2::StatisticsWorker, status: start
Jul 16 17:29:29.985 INFO Got a metric (v1), metric: Stats { metric: "temp.outside", value: 10.01 }
Jul 16 17:29:29.985 INFO Got a metric (v2), metric: Stats { metric: "temp.inside", value: 20.02 }
Jul 16 17:29:29.985 INFO sidekiq, jid: 342290d7246ef7a50821677f, queue: rust:v1_statistics, class: V1StatisticsWorker, status: start
Jul 16 17:29:29.985 INFO sidekiq, jid: c03f00f798e791277432dfeb, queue: rust:v1_statistics, class: V1StatisticsWorker, status: done, elapsed: 249.33µs
Jul 16 17:29:29.985 INFO sidekiq, jid: 1863e9ab0ac5ce63ab4bceaf, queue: rust:v2_statistics, class: V2::StatisticsWorker, status: done, elapsed: 201µs
Jul 16 17:29:29.985 INFO Got a metric (v2), metric: Stats { metric: "temp.inside", value: 20.02 }
Jul 16 17:29:29.985 INFO Got a metric (v1), metric: Stats { metric: "temp.outside", value: 10.01 }
Jul 16 17:29:29.985 INFO sidekiq, jid: 2c76489f7a75381eebc1f9d7, queue: rust:v2_statistics, class: V2::StatisticsWorker, status: done, elapsed: 235.57µs
Jul 16 17:29:29.985 INFO sidekiq, jid: 342290d7246ef7a50821677f, queue: rust:v1_statistics, class: V1StatisticsWorker, status: done, elapsed: 325.02µs
film42 commented 2 years ago

I made rustic enqueue jobs to the ruby process, and ruby enqueues for rust.. otherwise it was hard to make sure they're cooperating together.

justmark commented 2 years ago

Totally missed that. I modified the rust code so that it was both generating jobs for rust, and consuming - and that worked fine. So definitely have an issue on my side that is causing this problem.

justmark commented 2 years ago

I'm somewhat closer, albeit somewhat confused. I am pushing jobs with Rust, and they're showing up under Retries, rather than in their queue. I have a separate Rust process that is accessing the queue, and it is getting the jobs when the Sidekiq server retries the job. I remember reading about someone having this issue. I'll see if I can find it, and hopefully that'll shed some light.

film42 commented 2 years ago

Thanks! It might be something simple. Which sidekiq.rb version are you using? That could be a contributing factor maybe? You're also welcome to send me any code via email if you'd prefer to not post publicly (film42 @ gmail).

justmark commented 2 years ago

Hi,

My Gem versions:

sidekiq (6.5.1)
sidekiq-ent (2.5.1)
sidekiq-pro (5.5.1)

I added the rust:v1_statistics and rust:v2_statistics to the Sidekiq::Web interface so I could monitor the status of those queues. I removed the worker so that jobs would just get enqueud. What I saw was the same as what I was experiencing, jobs were just showing in the retry list with the error: NameError: uninitialized constant V1StatisticsWorker

This is the code I'm using. I removed my auth/password :)

use async_trait::async_trait;
use bb8::Pool;
use serde::{Deserialize, Serialize};
use sidekiq::{
    periodic, ChainIter, Job, Processor, RedisConnectionManager, RedisPool, ServerMiddleware,
    ServerResult, Worker, WorkerRef,
};
use slog::{debug, error, info, o, Drain};
use std::sync::Arc;

mod v2 {
    use super::*;

    pub struct StatisticsWorker {
        pub logger: slog::Logger,
    }

    #[async_trait]
    impl Worker<Stats> for StatisticsWorker {
        async fn perform(&self, args: Stats) -> Result<(), Box<dyn std::error::Error>> {
            info!(self.logger, "Got a metric (v2)"; "metric" => format!("{:?}", args));

            Ok(())
        }

        // Set the default queue
        fn opts() -> sidekiq::WorkerOpts<Stats, Self>
        where
            Self: Sized,
        {
            sidekiq::WorkerOpts::new().queue("rust:v2_statistics")
        }

        // Set the default class name
        fn class_name() -> String
        where
            Self: Sized,
        {
            "V2::StatisticsWorker".to_string()
        }
    }
}

#[derive(Clone)]
struct V1StatisticsWorker {
    logger: slog::Logger,
}

#[derive(Debug, Serialize, Deserialize)]
struct Stats {
    metric: String,
    value: f64,
}

#[async_trait]
impl Worker<Stats> for V1StatisticsWorker {
    async fn perform(&self, args: Stats) -> Result<(), Box<dyn std::error::Error>> {
        info!(self.logger, "Got a metric (v1)"; "metric" => format!("{:?}", args));

        Ok(())
    }
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let decorator = slog_term::PlainSyncDecorator::new(std::io::stdout());
    let drain = slog_term::FullFormat::new(decorator).build().fuse();
    let logger = slog::Logger::root(drain, o!());

    let manager = RedisConnectionManager::new("redis://:auth@ip:6379/0")?;
    let mut redis = Pool::builder()
        .connection_customizer(sidekiq::with_custom_namespace("sidekiq".to_string()))
        .build(manager)
        .await?;

    tokio::spawn({
        let mut redis = redis.clone();

        async move {
            loop {
                println!("Inserting...");
                V1StatisticsWorker::opts()
                    .queue("rust:v1_statistics")
                    .perform_async(
                        &mut redis,
                        Stats {
                            metric: "temp.house.basement".into(),
                            value: 12.2,
                        },
                    )
                    .await
                    .unwrap();

                v2::StatisticsWorker::opts()
                    .perform_async(
                        &mut redis,
                        Stats {
                            metric: "temp.house.garage".into(),
                            value: 13.37,
                        },
                    )
                    .await
                    .unwrap();

                tokio::time::sleep(std::time::Duration::from_secs(5)).await;
            }
        }
    });

    tokio::time::sleep(std::time::Duration::from_secs(30)).await;
    Ok(())
}
film42 commented 2 years ago

Is it possible that there is a different app/ process listening on that same queue without a definition of that worker in ruby code? If you stopped all the workers so it can enqueue without processing, and it goes directly to the retry queue, there is likely a sidekiq server still processing?

justmark commented 2 years ago

This queue is only used by the Rust process to create the job and a second process to consume it. There is y anything Ruby wise associated with it.

film42 commented 2 years ago

I have access to a sidekiq-ent license and tried adding that to the demo ruby app.rb in this PR and I'm still able to get ruby to consume just fine. Sidekiq-web pointed at the yolo_app namespace shows that the ruby app is consuming on the Queues: ruby:v1_statistics, ruby:v2_statistics and I'm seeing the work complete successfully.

require "redis-namespace"
require "sidekiq"
require "sidekiq-ent"

Sidekiq.configure_server do |config|
  config.redis = {
    url: "redis://127.0.0.1:6379",
    namespace: "yolo_app",
  }
end

Sidekiq.configure_client do |config|
  config.redis = {
    url: "redis://127.0.0.1:6379",
    namespace: "yolo_app",
  }
end

module V2
  class StatisticsWorker
    include Sidekiq::Worker
    sidekiq_options queue: "ruby:v2_statistics"

    def perform(payload)
      puts "Received a payload (v2): #{payload}"
    end
  end
end

class V1StatisticsWorker
  include Sidekiq::Worker
  sidekiq_options queue: "ruby:v1_statistics"

  def perform(payload)
    puts "Received a payload (v1): #{payload}"
  end
end

Thread.new do
  loop do
    V1StatisticsWorker.
      set(:queue => "rust:v1_statistics").
      perform_async({"metric" => "temp.outside", "value" => 10.01})

    V2::StatisticsWorker.
      set(:queue => "rust:v2_statistics").
      perform_async({"metric" => "temp.inside", "value" => 20.02})

    sleep 5
  end
end

Running via:

bundle exec sidekiq -q ruby:v1_statistics -q ruby:v2_statistics -r ./app.rb
2022-07-17T00:56:21.675Z pid=101652 tid=24qw class=V1StatisticsWorker jid=735ddfa7058cecc3e56b73e7 INFO: start
Received a payload (v1): {"metric"=>"temp.house.basement", "value"=>12.2}
2022-07-17T00:56:21.675Z pid=101652 tid=24qw class=V1StatisticsWorker jid=735ddfa7058cecc3e56b73e7 elapsed=0.0 INFO: done
justmark commented 2 years ago

If I create the job in Ruby, and consume it in Rust, then everything works fine. My issue is creating in Rust is causing this NameError to occur.

So Ruby->Rust has worked fine but Rust->Rust hasn’t. Something with Rust job creation seems to be the issue as my Rust consumer is the same on both cases.

justmark commented 2 years ago

Morning.

I went back to the drawing board, and I think things are a lot closer, but I'm able to easily reproduce something strange. I decided to try things out using a queue name, and worker name that hadn't been used previously. The queue is called some_queue and my Sidekiq server doesn't know anything about it. I didn't start up Sidekiq with this queue defined in the config or -q switch.

I then ran the following Rust code which uses a namespace called sidekiq:

use async_trait::async_trait;
use bb8::Pool;
use serde::{Deserialize, Serialize};
use sidekiq::{
    periodic, ChainIter, Job, Processor, RedisConnectionManager, RedisPool, ServerMiddleware,
    ServerResult, Worker, WorkerRef,
};
use slog::{debug, error, info, o, Drain};
use std::sync::Arc;

#[derive(Clone)]
struct SomeWorker {
    logger: slog::Logger,
}

#[derive(Debug, Serialize, Deserialize)]
struct Stats {
    metric: String,
    value: f64,
}

#[async_trait]
impl Worker<Stats> for SomeWorker {
    async fn perform(&self, args: Stats) -> Result<(), Box<dyn std::error::Error>> {
        info!(self.logger, "Got a metric (v1)"; "metric" => format!("{:?}", args));

        Ok(())
    }
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let decorator = slog_term::PlainSyncDecorator::new(std::io::stdout());
    let drain = slog_term::FullFormat::new(decorator).build().fuse();
    let logger = slog::Logger::root(drain, o!());

    let manager = RedisConnectionManager::new("redis://:auth@ip:6379/0")?;
    let mut redis = Pool::builder()
        .connection_customizer(sidekiq::with_custom_namespace("sidekiq".to_string()))
        .build(manager)
        .await?;

    tokio::spawn({
        let mut redis = redis.clone();

        async move {
            loop {
                SomeWorker::opts()
                    .queue("some_queue")
                    .perform_async(
                        &mut redis,
                        Stats {
                            metric: "temp.house.basement".into(),
                            value: 12.2,
                        },
                    )
                    .await
                    .unwrap();

                println!("added job to some_queue");
                tokio::time::sleep(std::time::Duration::from_secs(5)).await;
            }
        }
    });

    tokio::time::sleep(std::time::Duration::from_secs(60)).await;
    Ok(())
}

This pushed 12 jobs to some_queue. I then go to my Sidekiq web, and there is no some_queue listed.

I then run the following Ruby code, which inserts a single job to some_queue:

require "redis-namespace"
require "sidekiq"

Sidekiq.configure_server do |config|
  config.redis = {
    url: "redis://127.0.0.1:6379",
    namespace: "sidekiq",
    password: 'auth'
  }
end

Sidekiq.configure_client do |config|
  config.redis = {
    url: "redis://127.0.0.1:6379",
    namespace: "sidekiq",
    password: 'auth'
  }
end

class SomeWorker
  include Sidekiq::Worker
  sidekiq_options queue: "some_queue"

  def perform(payload)
    puts "Received a payload (v1): #{payload}"
  end
end

SomeWorker.perform_async({"metric" => "temp.outside", "value" => 10.01})

I then go to my Sidekiq web interface, and some_queue is now listed, with 13 jobs. When I check the jobs, they're all correctly created.

Any idea why queues aren't showing up when they're just in just by Rust, but do show up as soon as Ruby arrives on the scene?

justmark commented 2 years ago

To add a bit more confusion:

I then went and added the queue name to my sidekiq.yml file, and restarted the server. Jobs were then going to the Retries queue. I went and removed the queue name from the .yml file, and then the queue was working properly again!

film42 commented 2 years ago

There is a stats component that I haven't implemented yet that sidekiq web uses. We had to implement it for go at work before so it shouldn't be too bad. https://github.com/mxenabled/go-workers2/tree/b93765eec7bc54a81e08166362485103cb890873

I think we can start with the queue size first and then later add the active processing status metrics.

justmark commented 2 years ago

Ok! In the meantime I have this small ruby script to make the queue visible in Sidekiq::web. Seems like I only need to use that if the queue gets deleted.

film42 commented 2 years ago

Nice! I just published a change that makes the queues appear in sidekiq web. Will be longer for processor stats to be ready.

justmark commented 2 years ago

That's great. I've already updated to 0.6.2 :)

film42 commented 8 months ago

Closing for now. Can use this as a ref for future changes but clearly not needed.