davidmoten / rxjava-file

RxJava observables for files including NIO events
Apache License 2.0
82 stars 17 forks source link

Watcher lives on after all subscribers have unsubscribed #18

Closed RickeyWard closed 5 years ago

RickeyWard commented 5 years ago

After

                    Observable<String> items =  FileObservable.tailer()
                   .file("/test.log")
                   .startPosition(0)
                   .sampleTimeMs(500)
                   .chunkSize(8192)
                   .utf8()
                   .tailText();
                   Subscription subItem = items.subscribeOn(Schedulers.newThread()).subscribe(s->{
                        System.out.println(s);
                   });
                   subItem.unsubscribe();

A thread named RxIoScheduler-1 (Evictor) seems to live on as long as the process is open instead of dying off because there are no subscriptions.

Am I missing something obvious?

RickeyWard commented 5 years ago

I realized that this is based on rx 1.x, calling Schedulers.shutdown() when you're done with the threads resolves the issue. thanks.

davidmoten commented 5 years ago

I realized that this is based on rx 1.x, calling Schedulers.shutdown() when you're done with the threads resolves the issue. thanks.

Yep, that's the case. Glad you're sorted.

davidmoten commented 5 years ago

Yep, by the way I'd use Schedulers.io() instead of Schedulers.newThread() pretty much always because you get reuse of the created threads from an unbounded pool. A scheduler created with Schedulers.newThread() must be disposed of manually otherwise you can get a memory leak (unless of course you use Schedulers.shutdown()).

RickeyWard commented 5 years ago

Thanks for the suggestion. I was having issues with the threads hanging around in Schedulers.io() also but it might be a better choice to reuse them if I can. the Schedulers.newThread() seems to keep the references (or atleast one) around until you call shutdown. even though that thread has exited. I was creating my own wrapping thread with the to put the subscribe call in originally but then I couldn't shut down the thread safely unless some event happened. I wrapped your tailer in a buffer(x, milliseconds, x) which solved that but then that ended up using both Schedulers.io() and Schedulers.compute() internally so then I had 2 threads not stopping. But using Schedulers.shutdown() fixed all of them.

in rx 2.x the Schedule abstract includes a shutdown which lead me down a rabbit hole until I realized this was 1.x

I'm new to Java RX, so this was rough to wrap my head around.

davidmoten commented 5 years ago

If you want a bounded thread pool use Schedulers.from(executor). You still need to control the lifecycle of the ExecutorService and dispose the scheduler though. I can't remember if there are any gotchas calling Schedulers.shutdown (beyond stuffing up RxJava code running concurrently doing something else). Like for instance can you use RxJava 1.x again after shutdown is called? Concurrent calls on the bit of code you have would be a problem with shutdown happening asynchronously. I assume you're just calling it once?

RickeyWard commented 5 years ago

It's running in a tomcat servlet webapp, streaming log files via websockets. Currently I only call shutdown when the context listener destroy happens. (When tomcat is shutdown or more importantly if the webapp is undeployed so it doesn't leak threads). I'm okay will killing the threads even if they are busy. That would even be preferred in this case because they would no longer serve a purpose.

davidmoten commented 5 years ago

Cool, yep that sounds like the right use case for Schedulers.shutdown.