Skullabs / kikaha

A fast middleware designed for microservices
https://skullabs.github.io/kikaha/
Apache License 2.0
59 stars 13 forks source link

Infinite size of Threads.asyncJobs #264

Open roneigebert opened 5 years ago

roneigebert commented 5 years ago

I think we have a problem in asyncJobs queue. I see that this queue is used in shutdown, but, executorService.awaitTermination can be a good alternative, right?

@RunWith(KikahaRunner.class)
public class ThreadsTest {

    Threads threads = Threads.elasticPool();

    final AtomicInteger counter = new AtomicInteger(0);
    final int numberOfRunnables = 1000000;

    @Test
    @SneakyThrows
    public void testThreads(){
        int i = 0;
        while ( i++ < numberOfRunnables )
            threads.submit( counter::incrementAndGet );
        Thread.sleep( 2000 );
        val asyncJobsField = threads.getClass().getDeclaredField( "asyncJobs" );
        asyncJobsField.setAccessible( true );
        val asyncJobs = (Queue)asyncJobsField.get( threads );
        assertEquals( numberOfRunnables, counter.get() );
        assertEquals( 0, asyncJobs.size() ); // test fails here - size is 1000000!
    }

}
miere commented 5 years ago

Hey, Ronei, how is it going, mate?

I’m not sure if I properly understand your issue. Can you please give me a bit of context and a step-by-step guide on how to reproduce your problem?

On Fri, 12 Apr 2019 at 2:01 pm, Ronei notifications@github.com wrote:

I think we have a problem in asyncJobs queue. I see that this queue is used in shutdown, but, executorService.awaitTermination can be a good alternative, right?

@RunWith(KikahaRunner.class)public class ThreadsTest {

Threads threads = Threads.elasticPool();

final AtomicInteger counter = new AtomicInteger(0);
final int numberOfRunnables = 1000000;

@Test
@SneakyThrows
public void testThreads(){
    int i = 0;
    while ( i++ < numberOfRunnables )
        threads.submit( counter::incrementAndGet );
    Thread.sleep( 2000 );
    val asyncJobsField = threads.getClass().getDeclaredField( "asyncJobs" );
    asyncJobsField.setAccessible( true );
    val asyncJobs = (Queue)asyncJobsField.get( threads );
    assertEquals( numberOfRunnables, counter.get() );
    assertEquals( 0, asyncJobs.size() ); // test fails here - size is 1000000!
}

}

— You are receiving this because you are subscribed to this thread. Reply to this email directly, view it on GitHub https://github.com/Skullabs/kikaha/issues/264, or mute the thread https://github.com/notifications/unsubscribe-auth/AAf20PcW4l5_O6lMB0B6G-kv48kE8wAbks5vgAUGgaJpZM4crSKD .

roneigebert commented 5 years ago

Hi, I'm fine and you?

I used the Threads.elasticPool() to run user routines in background (thousands of things every hour) and I had memory problems in the app (heap space). I saw the queue is never clean (only on shutdown) and I think this might have favoured my app memory problems (heap space). Currently I changed the code to use a own thread pool.

miere commented 5 years ago

Oh, I see. Fortunately, it isn't exactly a problem on Threads' class design, but on how you should use it. Threads is a simplification over the JDK's Executor Service. It tries to enforce some guaranties, ensuring that all of its child threads will be gracefully terminated.

That said, you should use it only when you have a set of threads that should live (approximately) the time your application runs... like consumers from AWS SQS or RabbitMQ queue, AWS Kinesis or Kafka streams...

If you want a long lived Thread Pool, but you frequently has some bursts - like parallel executions of HTTP requests, you should use the background API https://gist.github.com/miere/307c330af0897f50b3232e41654182c1. On this example if waits for the termination of all parallel threads to proceed. The background API also have the await method, that can be called manually to wait for messages that still being processed and then release it from pool once finished (the same happens if you call close, hence you can put it in a try-with-resource block).

The Background API is quite flexible (covering 2 scenarios above described) and should be the preferred way to use this class. If none of the 3 cases above matches your requirements, I think I'll need more details to proper understand how the Threads API can be useful for you.

On Mon, Apr 15, 2019 at 10:36 AM Ronei notifications@github.com wrote:

Hi, I'm fine and you?

I used the Threads.elasticPool() to run user routines in background (thousands of things every hour) and I had memory problems in the app (heap space). I saw the queue is never clean (only on shutdown) and I think this might have favoured my app memory problems (heap space). Currently I changed the code to use a own thread pool.

— You are receiving this because you commented. Reply to this email directly, view it on GitHub https://github.com/Skullabs/kikaha/issues/264#issuecomment-483073266, or mute the thread https://github.com/notifications/unsubscribe-auth/AAf20NgApA5ivFN3UiY0jZCTNlEN4iL6ks5vg8mIgaJpZM4crSKD .

-- / Miere L. Teixeira /

miere commented 3 years ago

@roneigebert any thoughts?

roneigebert commented 3 years ago

Hi @miere! Well, I created a custom thread pool to solve this problem. But, if you agree we can improove the "kikaha threads" to clean everything after the thread execution with a TRY-FINALLY block

miere commented 3 years ago

Sounds good to me, mate. Will be waiting for you PR.

On Mon, Feb 8, 2021 at 11:44 AM Ronei notifications@github.com wrote:

Hi @miere https://github.com/miere! Well, I created a custom thread pool to solve this problem. But, if you agree we can improove the "kikaha threads" to clean everything after the thread execution with a TRY-FINALLY block

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/Skullabs/kikaha/issues/264#issuecomment-774801871, or unsubscribe https://github.com/notifications/unsubscribe-auth/AAD7NUEIDXR4V57MPWWBW7LS54XWLANCNFSM4HFNEKBQ .

-- / Miere L. Teixeira /