ReactiveX / RxJava

RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.
Apache License 2.0
47.85k stars 7.6k forks source link

0.19 - Potential Leak in CompositeSubscription with Executor? #1312

Closed daschl closed 10 years ago

daschl commented 10 years ago

Hey folks,

doing performance tests with 0.19 did bring up something interesting - check this out on a 4gig heap:

screen shot 2014-06-04 at 09 34 22

96% of my heap are taken up by the CompositeSubscription.

I'm running a workload like this, that is fetching docs out of the server (but here for example without any docs, so they are not found - to not mess with object decoding in the benchmark.)

        List<GetRequest> keys = new ArrayList<GetRequest>(1024);
        for (int i = 0; i < 1024; i++) {
            keys.add(new GetRequest("key" + i, bucket));
        }

        while(true) {
            final CountDownLatch latch = new CountDownLatch(1);
            Observable.from(keys).flatMap(new Func1<GetRequest, Observable<?>>() {
                @Override
                public Observable<GetResponse> call(GetRequest s) {
                    return cluster.send(s);
                }
            }).toList().doOnCompleted(new Action0() {
                @Override
                public void call() {
                    latch.countDown();
                }
            }).subscribe();
            try {
                latch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

Looking into the CompositeSubscription the dump makes sense since it uses a HashSet which is internally backed by a HashMap.

Another hint might be that they seem to be hanging around in the executor? screen shot 2014-06-04 at 09 44 16

Not sure if there is a leak somewhere (I'm assuming, since 95% are reachable in the heap dump!), but it is causing unbearable pressure on the test system. In case you wonder about the workload, its around 100k ops/s until it goes nuts since GC takes up all time.

daschl commented 10 years ago

pinging @akarnokd and @benjchristensen. let me know if you need more info from that heap dump, it's quite large. That thing prevents me from testing the new 0.19 perf improvements since it dominates it all :)

daschl commented 10 years ago

Hmm it could be me scheduling too much stuff, but the hanging around so long doesn't make sense to me. I'll do further investigations and then reopen if I have news.

daschl commented 10 years ago

Okay, reopened this because I think I'm either misusing it completely or there is indeed something wrong with freeing stuff.

Let me give you some context: I'm writing those requests into a netty channel, but if channel.isWritable() returns false (indicating a overload on the writer side), I push it back into a response RingBuffer.

Now in my handler there I basically say "since I got a request in there, I need to reschedule it so that it will be retried, but lets do it with a backoff since doing it right now might now have any value."

So what I'm doing is:

        Schedulers.computation().createWorker().schedule(new Action0() {
            @Override
            public void call() {
                cluster.send(request);
            }
        }, 10, TimeUnit.MILLISECONDS);

Which means after 10 MS push the request again into the request RingBuffer so it will go into the request path again.

I think this is where all those objects come from, but shouldn't they be cleaned up at some point? Or do I need to do something special to indicate they can be freed? The send method returns immediately.

Any idea on how to approach this differently if I'm misusing the system?

akarnokd commented 10 years ago

You create a worker for each reschedule action which creates a lot of subscriptions to track stuff. If you don't do any other computation there, you could reuse an existing worker, maybe from Schedulers.newThread() so it doesn't disrupt the computation threads that much.

daschl commented 10 years ago

@akarnokd thanks, how do you mean? If I change the code to Schedulers.newThread().createWorker() I immediately run out of memory since it cant create more threads.

akarnokd commented 10 years ago

Cache the worker returned from Schedulers.newThread().createWorker() and use that.

daschl commented 10 years ago

@akarnokd I get more or less the same heap dump, too much subscriptions and hash sets created. Maybe I need to use my own HashedWheelTimer or so to reschedule those ops? Looks like the overhead to track this stuff in RxJava is too much?

daschl commented 10 years ago

@akarnokd but on the other hand, why are they sticking around for so long? If you look at my test case, I wait until the 1024 return so they went through correctly (which means they should be removed from the worker already?) They are still keeping around

akarnokd commented 10 years ago

I need a heapdump for further investigation. Could you create a smaller program I could run on my own accompanied by a dump?

daschl commented 10 years ago

@akarnokd I'll try to come up with something self-contained but it might take a while. I'll ping you once I have it.

akarnokd commented 10 years ago

Okay.

One drawback with this HashSet-based composite is that the HashSet grows and never shrinks. When I was researching #1145, many sources indicated that there is no good way to establish a shrink policy on java Collection classes.

However, we could introduce a simple policy in which the HashSet is recreated with default capacity if the composite reaches 0 size. Unfortunately, HashMap doesn't expose the underlying table's capacity so we could avoid constant recreation in case the size changes back and forth between 0 and some small value.

benjchristensen commented 10 years ago

That code also is never cleaning up the Worker that is created.

Here is a simple example to show how the Worker needs to be registered for cleanup: https://github.com/Netflix/RxJava/blob/master/rxjava-core/src/main/java/rx/internal/operators/OperatorRepeat.java#L76

benjchristensen commented 10 years ago

If this is CPU bound then the computation scheduler is the right one. More threads can't help get more CPU time. If it's doing IO then you should use the IO scheduler which caches threads (as opposed to the NewThreadScheduler that creates a new one every time).

daschl commented 10 years ago

@benjchristensen maybe the worker cleanup is the right pointer, this could very well be the issue since the stuff is sticking around forever and never get cleaned up.

daschl commented 10 years ago

@benjchristensen sorry I don't get the sample. where do I need to register the worker to be cleaned up?

benjchristensen commented 10 years ago

Here is some code that may point the direction ...

        Worker worker = Schedulers.computation().createWorker();
        worker.schedule(new Action0() {
            @Override
            public void call() {
                // do stuff
            }
        }, 10, TimeUnit.MILLISECONDS);

        // somewhere this needs to be called
        worker.unsubscribe();

        // ObserveOn schedules the unsubscribe: https://github.com/Netflix/RxJava/blob/master/rxjava-core/src/main/java/rx/internal/operators/OperatorObserveOn.java#L79

        // here is a Subscriber for registering it
        Subscriber s = new TestSubscriber();
        s.add(worker);
        // when the Subscriber is unsubscribed the Worker will also be shut down
        s.unsubscribe();

The assumption is that your code is using the Scheduler within an operator or Observable chain somewhere, and thus has a Subscriber it can register itself with.

So inside an Operator, you would take the Subscriber you receive and register your worker with it subscriber.add(worker).

benjchristensen commented 10 years ago

Here is an example of when the Worker is created and registered with a Subscriber: https://github.com/Netflix/RxJava/blob/master/rxjava-core/src/main/java/rx/internal/operators/OperatorObserveOn.java#L77

The repeat example also creates the Worker then immediately registers it via Subscriber.add: https://github.com/Netflix/RxJava/blob/master/rxjava-core/src/main/java/rx/internal/operators/OperatorRepeat.java#L74

daschl commented 10 years ago

@benjchristensen would it make sense to cache the worker and then pass all the calls in this one? or does this screw up the semantics.

daschl commented 10 years ago

@benjchristensen especially, if I schedule 100 tasks immediately to be executed in 10ms. would it be like 10ms, 10ms, 10ms,... or would all 100 be executed in 10ms from now. I think if its the former, then I need to go with multiple workers since that would be pretty bad for throughput.

benjchristensen commented 10 years ago

The Worker is a single queue that ties itself to a single thread so ensure sequential execution (the Observable contract). Also, it participates in the subscription lifecycle (unsubscribe).

Since I don't know where you're using the Scheduler, caching it may be okay, but only if it represents a single stream of data and you're okay being pinned to a single thread for every (generally you would want each Observable to have its own Worker so they are scheduled independently).

See here for how it schedules itself onto an event loop: https://github.com/Netflix/RxJava/blob/master/rxjava-core/src/main/java/rx/schedulers/EventLoopsScheduler.java#L68

If you're wanting parallel execution, a Scheduler.Worker will never give that. It is exactly the opposite behavior of a Worker whose purpose is to schedule work sequentially and comply with the Observable contract. Parallel execution is done by splitting into multiple Observables (such as .parallel()) and we're considering a ParallelObservable with a small number of operators like map that would be allowed to run concurrently.

If you schedule 100 tasks on a single Worker to be done in 10ms, the worker will wait 10ms then wake up and run the 100 tasks sequentially on the same thread.

headinthebox commented 10 years ago

To add to @benjchristensen's advice, you typically need not to work at the level of schedulers. In fact, I often use http://netflix.github.io/RxJava/javadoc/rx/Observable.html#repeat(rx.Scheduler) to just create a stream of "turns" and continue in regular Rx land.

daschl commented 10 years ago

@headinthebox this part of the code is imperative, it is handler which gets notified by the LMAX Disruptor. All it has to do is pass the message to another method.

I hope someone could tell me how to use this properly, since I don't use it as part of observables. Currently I have this which doesn't work obviously and still creates massive amounts of garbage (as reported).

    private void scheduleForRetry(final CouchbaseRequest request) {
        final AtomicReference<Subscription> subscription = new AtomicReference<Subscription>();
        subscription.set(worker.schedule(new Action0() {
                @Override
                public void call() {
                    cluster.send(request);
                    subscription.get().unsubscribe();
                }
            }, 10, TimeUnit.MILLISECONDS));
    }

That code doesn't work properly, but I don't see a way to unsubscribe properly once the code ran? Th e worker is a cached Schedulers.computation().createWorker();. Note that the cluster.send() method is more or less instantaneous it just publishes into a ringbuffer and is done with it.

benjchristensen commented 10 years ago

That code will only unsubscribe the single action you scheduled ... which is already completing by the time you invoke it. If you want to unsubscribe the entire worker, you need to call worker.unsubscribe().

I think it would help to step back to what you're trying to do. In the original code example where cluster.send is being invoked, is that method blocking? If so you could just use subscribeOn(Schedulers.io()) and it would handle it for you.

Can you provide a sample Gist that is self-contained without Couchbase code that I can run and provide suggestions on?

daschl commented 10 years ago

@benjchristensen okay here is something self-contained.

    @Test
    public void foo() throws Exception {
        Scheduler.Worker worker = Schedulers.io().createWorker();

        while(true) {
            for (int i = 0; i < 1000; i++) {
                worker.schedule(new Action0() {
                    @Override
                    public void call() {

                    }
                }, 10, TimeUnit.MILLISECONDS);
            }
            Thread.sleep(10);
        }
    }

If you run this and use something like YourKit, you can see that old gen is growing like crazy. If you increase the heap size with a higher young ratio it just delays the effect - I think this shows that its not getting unsubscribed, the question is just whats going on.

Looks like this here:

screen shot 2014-06-06 at 06 48 48

akarnokd commented 10 years ago

I run the test above and what I see is on my machine with Java 8 is that OldGen slowly fills up and is not collected at all. What happens is that the ScheduledAction and related objects get promoted to the OldGen but they become unreachable after 10 ms anyway and just using up space until a full GC happens.

daschl commented 10 years ago

@akarnokd correct, do you have any idea what we can do about that?

daschl commented 10 years ago

I know it's against how GCs predict it would be used, it just sticks around for so long to be promoted to old and then needs to be collected, pretty bad use case.

akarnokd commented 10 years ago

You could increase the YoungGen space size in this particular case so it won't get filled within 10-15ms and objects will probably die before they need to be promoted from survivor space.

daschl commented 10 years ago

@akarnokd I also thought about using a CMS since that could help with the gc pauses in the old gen. I'll see what I can tune GC wise, but ideally we don't generate that much garbage - even if that means for me I need to down another route like using a timer from Netty or so.

benjchristensen commented 10 years ago

Closing this out as I believe it was solved, but more importantly because 0.20 has changed enough that memory allocation and collection all needs to be profiled again so we should use new issues for it.

Reopen if there is something further to discuss on this thread.

daschl commented 10 years ago

@benjchristensen sounds good, thanks! I think its anyway how I do the retry queues and I gotta rework that part entirely. I'll reopen if I find specific other areas that could be improved, cheers.