smol-rs / concurrent-queue

Concurrent multi-producer multi-consumer queue
Apache License 2.0
254 stars 21 forks source link

How to use concurrent-queue with tokio and lambda? #35

Closed RazgrizHsu closed 1 year ago

RazgrizHsu commented 1 year ago

Dear, I want to use Concurrent-Queue in my module,

#[cfg(test)]
mod test
{
    use std::borrow::Borrow;
    use std::fmt::Display;
    use std::sync::Arc;
    use std::sync::atomic::{AtomicBool, Ordering};
    use std::thread::sleep;
    use std::time::Duration;
    use concurrent_queue::ConcurrentQueue;
    use tokio::runtime::Runtime;

    pub fn runConcurrentBy<T,ActProd, ActCons>( threads:i32, actProd:ActProd, actCons:ActCons ) -> (Arc<AtomicBool>,Runtime)
    where ActProd: Fn( &ConcurrentQueue<T> ),
        ActCons: Fn( &T ) + Send + Sync + 'static,
        T: Display + Send + 'static
    {
        let mut q = ConcurrentQueue::unbounded();

        let rtm = Runtime::new().unwrap();
        //
        let end = AtomicBool::new(false);

        let stopAll = Arc::new(end);

        let queue = Arc::new(q);

        for _ in 0..threads
        {
            let stop = stopAll.clone();
            let q = queue.clone();

            rtm.spawn(async move
            {
                while !stop.load( Ordering::Relaxed )
                {
                    if let Ok( val ) = q.pop()
                    {
                        actCons( &val );
                        //println!( "item: {}", val );
                    }
                }
            });
        }

        actProd( &queue );

        (stopAll,rtm)
    }

    #[tokio::test]
    async fn test_concurrentQueue()
    {
        let data = vec![ 1,2,3,4,5,6,7,8,9,10,11 ];

        let (stop,rtm) = runConcurrentBy( 10,
        |q|
        {
            for val in &data
            {
                q.push(val.to_owned()).unwrap();
            }
        },
        |v|
        {
            println!( "item: {}", v );
        }
        );

        sleep( Duration::from_secs_f64(3.0) );

        stop.swap( true, Ordering::Relaxed );
        rtm.shutdown_background();
    }
}

But always get error like..

17 |       pub fn runConcurrentBy<T,ActProd, ActCons>( threads:i32, actProd:ActProd, actCons:ActCons ) -> (Arc<AtomicBool>,Runtime)
   |                                                                                 ------- move occurs because `actCons` has type `ActCons`, which does not implement the `Copy` trait
...
38 | /             {
39 | |                 while !stop.load( Ordering::Relaxed )
40 | |                 {
41 | |                     if let Ok( val ) = q.pop()
42 | |                     {
43 | |                         actCons( &val );
   | |                         ------- use occurs due to use in generator
...  |
46 | |                 }
47 | |             });
   | |_____________^ value moved here, in previous iteration of loop
   |
help: consider borrowing `actCons`
   |
38 |             &{
   |             +

Please teach me hot to fix this, thank you very much

notgull commented 1 year ago

You are moving actCons into the context when you call rtm.spawn. Instead of moving actCons into the context, you should move a reference to it instead. You need to change it from this:

rtm.spawn(async move
{
    while !stop.load( Ordering::Relaxed )
    {
        if let Ok( val ) = q.pop()
        {
            actCons( &val );
            //println!( "item: {}", val );
        }
    }
});

to this:

rtm.spawn({
    // new: move the reference instead of the real value
    let actCons = &actCons;
    async move
    {
    while !stop.load( Ordering::Relaxed )
    {
        if let Ok( val ) = q.pop()
        {
            actCons( &val );
            //println!( "item: {}", val );
        }
    }
    }
});

As an aside, busy waiting on a ConcurrentQueue is generally a bad idea. Instead, you should use async-channel (which is built on top of concurrent-queue) or, since you're already using tokio, use an mpsc channel.

RazgrizHsu commented 1 year ago

@notgull I really appreciate your guidance, After some fix it's worked, Thank You very much 💯

Fixed code..

    use std::borrow::Borrow;
    use std::fmt::Display;
    use std::sync::Arc;
    use std::sync::atomic::{AtomicBool, Ordering};
    use std::thread::sleep;
    use std::time::Duration;
    use concurrent_queue::ConcurrentQueue;
    use tokio::runtime::Runtime;

    pub fn runConcurrentBy<T,ActProd, ActCons>( threads:i32, actProd:ActProd, actCons:ActCons ) -> (Arc<AtomicBool>,Runtime)
    where ActProd: Fn( &ConcurrentQueue<T> ),
        ActCons: Fn( &T ) + Send + Sync + 'static,
        T: Display + Send + 'static
    {
        let mut q = ConcurrentQueue::unbounded();

        let rtm = Runtime::new().unwrap();
        //
        let end = AtomicBool::new(false);

        let stopAll = Arc::new(end);

        let queue = Arc::new(q);

        let actCons = Arc::new( actCons );

        for _ in 0..threads
        {
            let stop = stopAll.clone();
            let q = queue.clone();
            let actCons = actCons.clone();

            rtm.spawn(async move
            {
                while !stop.load( Ordering::Relaxed )
                {
                    if let Ok( val ) = q.pop()
                    {
                        actCons( &val );
                        //println!( "item: {}", val );
                    }
                }
            });
        }

        actProd( &queue );

        (stopAll,rtm)
    }

    #[tokio::test]
    async fn test_concurrentQueue()
    {
        let data = vec![ 1,2,3,4,5,6,7,8,9,10,11 ];

        let (stop,rtm) = runConcurrentBy( 10,
        |q|
        {
            for val in &data
            {
                q.push(val.to_owned()).unwrap();
            }
        },
        |v|
        {
            println!( "item: {}", v );
        }
        );

        sleep( Duration::from_secs_f64(3.0) );

        stop.swap( true, Ordering::Relaxed );
        rtm.shutdown_background();
    }

h :)

I have tried async-channel as you said, But I can't find a good example to handle the data I want, In fact, I will shrink the image in the thread, It will take a while, so I did it this way,

If you have better suggestions, I hope you can give me some pointers, thank you :)

notgull commented 1 year ago

If you have better suggestions, I hope you can give me some pointers, thank you :)

Without really knowing your use case, here's a naive rewrite of your runConcurrentBy function, taking full advantage of async/await syntax and async-channel. I also used rustfmt to make it prettier.

use async_channel::Sender;

pub fn runConcurrentBy<T, ActProd, ActCons>(
    threads: i32,
    actProd: ActProd,
    actCons: ActCons,
) -> Runtime
where
    ActProd: Fn(&Sender<T>),
    ActCons: Fn(&T) + Send + Sync + 'static,
    T: Display + Send + 'static,
{
    let (sender, receiver) = async_channel::<T>::unbounded();

    let rtm = Runtime::new().unwrap();
    let actCons = Arc::new(actCons);

    for _ in 0..threads {
        let q = receiver.clone();
        let actCons = actCons.clone();

        rtm.spawn(async move {
            while let Ok(val) = q.pop().await {
                actCons(&val);
            }
        });
    }

    actProd(&queue);

    rtm
}

If you want to be able to stop your concurrent tasks on demand, the usual option is to have a second channel that you send a signal down that causes the tasks to exit. See this example for this strategy in practice. Alternatively, you could just drop the Sender once you're one with it in actProd.