ReactiveX / RxScala

RxScala – Reactive Extensions for Scala – a library for composing asynchronous and event-based programs using observable sequences
Apache License 2.0
888 stars 110 forks source link

Weird behavior with multiple threads in single-threaded application #188

Closed rvanheest closed 4 years ago

rvanheest commented 8 years ago

I noticed some weird behavior in the following RxScala code:

package foo

import rx.lang.scala.Observable

object Main {

  def main(args: Array[String]) {
    Observable.just(1)
      .merge(Observable.just(2))
      .subscribe(println(_))

    println("DONE")
  }
}

When I run this from Maven (see pom.xml below) using mvn exec:java -Dexec.mainClass="foo.Main" in a terminal, it results in the output below pom.xml.

Everything below DONE in the output is only printed after 15 seconds.

I am aware of the fact that RxScala is just a wrapper around RxJava, so I also tried the same thing with similar code in Java. Besides that I also tried it with the RxJava library itself within Scala. Both worked correctly and terminated immediately after DONE in the terminal, without the IllegalThreadStateException shown in the output.

I already reduced this problem to the one above, but I found this via flatMap, which in itself is defined as merge(map(...)). The map does not suffer from the same problem, but I have not tested any other operators than merge, map and flatMap.

Does anyone of you have any idea where these threadpools come from? Why do these occur and how can I prevent this from happening? As far as I know, this is a single-threaded application (e.g. no schedulers nor observeOn nor subscribeOn operators are used), so I would not expect these threadpools to occur here! Also I cannot find any reference to them in the source code.

Thanks in advance for your help and support!

pom.xml:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>test</groupId>
    <artifactId>test</artifactId>
    <version>1.0</version>

    <dependencies>
        <dependency>
            <groupId>io.reactivex</groupId>
            <artifactId>rxscala_2.11</artifactId>
            <version>0.26.0</version>
        </dependency>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.11.7</version>
        </dependency>
    </dependencies>

    <build>
        <sourceDirectory>src/main/scala</sourceDirectory>
        <plugins>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.1</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.5.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.codehaus.mojo</groupId>
                <artifactId>exec-maven-plugin</artifactId>
                <version>1.4.0</version>
                <configuration>
                    <mainClass>foo.Main</mainClass>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

output:

[INFO] Scanning for projects...
[INFO]                                                                         
[INFO] ------------------------------------------------------------------------
[INFO] Building test 1.0
[INFO] ------------------------------------------------------------------------
[INFO] 
[INFO] --- exec-maven-plugin:1.4.0:java (default-cli) @ test ---
1
2
DONE
[WARNING] thread Thread[RxCachedWorkerPoolEvictor-1,5,testS.Main] was interrupted but is still alive after waiting at least 15000msecs
[WARNING] thread Thread[RxCachedWorkerPoolEvictor-1,5,testS.Main] will linger despite being asked to die via interruption
[WARNING] thread Thread[RxComputationThreadPool-1,5,testS.Main] will linger despite being asked to die via interruption
[WARNING] thread Thread[RxComputationThreadPool-2,5,testS.Main] will linger despite being asked to die via interruption
[WARNING] NOTE: 3 thread(s) did not finish despite being asked to  via interruption. This is not a problem with exec:java, it is a problem with the running code. Although not serious, it should be remedied.
[WARNING] Couldn't destroy threadgroup org.codehaus.mojo.exec.ExecJavaMojo$IsolatedThreadGroup[name=testS.Main,maxpri=10]
java.lang.IllegalThreadStateException
    at java.lang.ThreadGroup.destroy(ThreadGroup.java:778)
    at org.codehaus.mojo.exec.ExecJavaMojo.execute(ExecJavaMojo.java:328)
    at org.apache.maven.plugin.DefaultBuildPluginManager.executeMojo(DefaultBuildPluginManager.java:134)
    at org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:207)
    at org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:153)
    at org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:145)
    at org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject(LifecycleModuleBuilder.java:116)
    at org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject(LifecycleModuleBuilder.java:80)
    at org.apache.maven.lifecycle.internal.builder.singlethreaded.SingleThreadedBuilder.build(SingleThreadedBuilder.java:51)
    at org.apache.maven.lifecycle.internal.LifecycleStarter.execute(LifecycleStarter.java:128)
    at org.apache.maven.DefaultMaven.doExecute(DefaultMaven.java:307)
    at org.apache.maven.DefaultMaven.doExecute(DefaultMaven.java:193)
    at org.apache.maven.DefaultMaven.execute(DefaultMaven.java:106)
    at org.apache.maven.cli.MavenCli.execute(MavenCli.java:863)
    at org.apache.maven.cli.MavenCli.doMain(MavenCli.java:288)
    at org.apache.maven.cli.MavenCli.main(MavenCli.java:199)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at org.codehaus.plexus.classworlds.launcher.Launcher.launchEnhanced(Launcher.java:289)
    at org.codehaus.plexus.classworlds.launcher.Launcher.launch(Launcher.java:229)
    at org.codehaus.plexus.classworlds.launcher.Launcher.mainWithExitCode(Launcher.java:415)
    at org.codehaus.plexus.classworlds.launcher.Launcher.main(Launcher.java:356)
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 15.528 s
[INFO] Finished at: 2016-02-17T11:00:23+01:00
[INFO] Final Memory: 9M/309M
[INFO] ------------------------------------------------------------------------
akarnokd commented 8 years ago

RxJava has some daemon threads that by themselves don't prevent the JVM from quitting, but also they don't stop either if regular threads are around, such as it seems that maven executor. Usually, if the RxJava code runs in a container-like host, it is expected the app cleans up completely, including the threads which can be achieved via Schedulers.shutdown().

rvanheest commented 8 years ago

First of all thanks for the quick response! Adding Schedulers.shutdown() at the end of main worked great! Now it does what we expect it to do.

On the other hand I find it a bit surprising that I as a user of the library have to do this myself, rather than the library cleaning up its own internal mess. The other weird thing is that this apparently is necessary in RxScala while running in a container-like structure like Maven, but that the same is not needed when using RxJava (or is it?, I didn't notice this). Also this Schedulers.shutdown()is apparently not needed when running the app as a jar (or is it?)...

Thanks anyway for your help!

zsxwing commented 8 years ago

merge uses RxRingBuffer which has an ObjectPool to cache objects. And ObjectPool calls Schedulers.computation.

I think we may need to document this behavior somewhere. However, I don't think the issue itself is a big deal. If you don't need to use Scheduler, why use RxScala, especially you are using Scala :)

rvanheest commented 8 years ago

This is just a minimal example I came up with to make asking the initial question easier (instead of showing pages upon pages of code :smile:).

What is not clear though is why the user of merge should clean up stuff that is only used in the implementation and not really exposed to the outside world. On the one hand side this is maybe the desired behavior, on the other hand I think that at least documenting this somewhere would be a great idea!

Also, any suggestion of why I have this problem with the above code in RxScala in Scala, but not when I use RxJava in Java nor when I use RxJava in Scala? Seems a bit weird to me, given that RxScala is only a wrapper around RxJava.

zsxwing commented 8 years ago

Also, any suggestion of why I have this problem with the above code in RxScala in Scala, but not when I use RxJava in Java nor when I use RxJava in Scala? Seems a bit weird to me, given that RxScala is only a wrapper around RxJava.

Did you try the latest RxJava? I can reproduce it using the latest RxJava.

akarnokd commented 8 years ago

On the other hand I find it a bit surprising that I as a user of the library have to do this myself, rather than the library cleaning up its own internal mess.

Because RxJava is a library, not a framework, it can't know when you are done with using it. But it gives you the option to clean up in case its lifecycle should be shorter than the enclosing app.

In theory, your example shouldn't have touched schedulers or the RxRingBuffer at all so there is no reason Rx threads started up. Maybe some implicit class initialization is happening in the RxScala wrapper of the Schedulers and thus you have some Rx threads started:

public class RxWhoseThreads {
    public static void main(String[] args) throws Exception {
        Observable.just(1).mergeWith(Observable.just(2)).subscribe(System.out::println);

        Thread.sleep(1000);

        for (Thread t : Thread.getAllStackTraces().keySet()) {
            if (t.getName().toLowerCase().contains("rx")) {
                System.out.println(t);
            }
        }
    }
}

Doesn't print anything but 1 and 2 on RxJava 1.1.1 (Windows).

zsxwing commented 8 years ago

I see. The one argument just has a different code path (RxScala's just doesn't call this one, it just calls from(Iterable<? extends T> iterable)). Use the following codes should reproduce it:

    public static void main(String[] args) {
        Observable.just(1, 2).mergeWith(Observable.just(2, 3)).subscribe(System.out::println);
        Thread.getAllStackTraces().keySet().forEach(t -> System.out.println(t.getName()));
    }

And my output is

1
2
2
3
RxComputationThreadPool-2
Monitor Ctrl-Break
RxCachedWorkerPoolEvictor-1
Finalizer
Signal Dispatcher
RxComputationThreadPool-1
Reference Handler
main
akarnokd commented 8 years ago

Yep, RxRingBuffer.