brettwooldridge / NuProcess

Low-overhead, non-blocking I/O, external Process implementation for Java
Apache License 2.0
712 stars 84 forks source link

Process becomes "zombie" forever #100

Closed zella closed 2 years ago

zella commented 5 years ago

I run process. After it, I run multiple processes:

Here code, which always fails on my machine(ubuntu 18.10, xeon1270). It's rx-java2 based, but i'm not able reproduce another way.

Some comments about code:

onExit - signal, that process ends (only first process). After it (flatMap) multiple process started sequentialy (Single.concat). Note, that these process termination handled syncronous - waitFor. It's imortant, that i reproduce it only when first termination handled asyncronous, next - syncronous.

import com.zaxxer.nuprocess.NuAbstractProcessHandler;
import com.zaxxer.nuprocess.NuProcess;
import com.zaxxer.nuprocess.NuProcessBuilder;
import io.reactivex.Single;

import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class NuProcessBug {

    public static Single<Integer> async(List<String> command) {
        return Single.create(emitter -> {
            NuProcessBuilder builder = new NuProcessBuilder(command);
            builder.setProcessListener(new NuAbstractProcessHandler() {
                @Override
                public void onExit(int statusCode) {
                    emitter.onSuccess(statusCode);
                }
            });
            builder.start();

        });
    }

    public static Single<Integer> sync(List<String> command) {
        return Single.create(emitter -> {
            NuProcessBuilder builder = new NuProcessBuilder(command);
            builder.setProcessListener(new NuAbstractProcessHandler() {
            });
            NuProcess p = builder.start();
            int code = p.waitFor(9999, TimeUnit.SECONDS);
            emitter.onSuccess(code);
        });
    }

    public static void main(String[] args) {
        List<String> cmd = List.of("python3", "/sleep.py");

        async(cmd)
                .flatMap(something -> Single.concat(
                        Stream.of(1, 2, 3, 4, 5, 6, 7, 8).map(i -> sync(cmd)
                                .doOnSubscribe(d -> System.out.println(i + " start"))
                                .doFinally(() -> System.out.println(i + " end"))
                        )
                                .collect(Collectors.toList())
                ).toList())
                .blockingGet();

    }
}

Process - python script:

import time
sleep(5000)

Output

1 start
1 end
2 start
2 end
3 start
3 end
4 start

And 4 process becomes "zombie" forever.

bturner commented 3 years ago

You're calling emitter.onSuccess in onExit, which happens on a NuProcess thread. NuProcessHandler callbacks must not block when used with start() or they can deadlock, and that's exactly what happens here. Looking at the call stack for your test when it hangs shows this:

"ProcessQueue0" #14 daemon prio=5 os_prio=0 tid=0x00007f6668564000 nid=0x1911 waiting on condition [0x00007f6649e76000]
   java.lang.Thread.State: TIMED_WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x000000076e93cb10> (a java.util.concurrent.CountDownLatch$Sync)
    at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedNanos(AbstractQueuedSynchronizer.java:1037)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1328)
    at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:277)
    at com.zaxxer.nuprocess.internal.BasePosixProcess.waitFor(BasePosixProcess.java:167)
    at com.zaxxer.nuprocess.Test100.lambda$sync$5(Test100.java:55)
    at com.zaxxer.nuprocess.Test100$$Lambda$15/1253548091.subscribe(Unknown Source)
    at io.reactivex.rxjava3.internal.operators.single.SingleCreate.subscribeActual(SingleCreate.java:40)
    at io.reactivex.rxjava3.core.Single.subscribe(Single.java:4813)
    at io.reactivex.rxjava3.internal.operators.single.SingleDoOnSubscribe.subscribeActual(SingleDoOnSubscribe.java:41)
    at io.reactivex.rxjava3.core.Single.subscribe(Single.java:4813)
    at io.reactivex.rxjava3.internal.operators.single.SingleDoFinally.subscribeActual(SingleDoFinally.java:44)
    at io.reactivex.rxjava3.core.Single.subscribe(Single.java:4813)
    at io.reactivex.rxjava3.internal.operators.mixed.FlowableConcatMapSingle$ConcatMapSingleSubscriber.drain(FlowableConcatMapSingle.java:260)
    at io.reactivex.rxjava3.internal.operators.mixed.FlowableConcatMapSingle$ConcatMapSingleSubscriber.onNext(FlowableConcatMapSingle.java:136)
    at io.reactivex.rxjava3.internal.operators.flowable.FlowableFromIterable$IteratorSubscription.slowPath(FlowableFromIterable.java:243)
    at io.reactivex.rxjava3.internal.operators.flowable.FlowableFromIterable$BaseRangeSubscription.request(FlowableFromIterable.java:131)
    at io.reactivex.rxjava3.internal.operators.mixed.FlowableConcatMapSingle$ConcatMapSingleSubscriber.onSubscribe(FlowableConcatMapSingle.java:125)
    at io.reactivex.rxjava3.internal.operators.flowable.FlowableFromIterable.subscribe(FlowableFromIterable.java:69)
    at io.reactivex.rxjava3.internal.operators.flowable.FlowableFromIterable.subscribeActual(FlowableFromIterable.java:47)
    at io.reactivex.rxjava3.core.Flowable.subscribe(Flowable.java:15750)
    at io.reactivex.rxjava3.internal.operators.mixed.FlowableConcatMapSingle.subscribeActual(FlowableConcatMapSingle.java:61)
    at io.reactivex.rxjava3.core.Flowable.subscribe(Flowable.java:15750)
    at io.reactivex.rxjava3.internal.operators.flowable.FlowableToListSingle.subscribeActual(FlowableToListSingle.java:56)
    at io.reactivex.rxjava3.core.Single.subscribe(Single.java:4813)
    at io.reactivex.rxjava3.internal.operators.single.SingleFlatMap$SingleFlatMapCallback.onSuccess(SingleFlatMap.java:85)
    at io.reactivex.rxjava3.internal.operators.single.SingleCreate$Emitter.onSuccess(SingleCreate.java:68)
    at com.zaxxer.nuprocess.Test100$1.onExit(Test100.java:38)
    at com.zaxxer.nuprocess.internal.BasePosixProcess.onExit(BasePosixProcess.java:319)
    at com.zaxxer.nuprocess.linux.ProcessEpoll.handleExit(ProcessEpoll.java:371)
    at com.zaxxer.nuprocess.linux.ProcessEpoll.cleanupProcess(ProcessEpoll.java:334)
    at com.zaxxer.nuprocess.linux.ProcessEpoll.process(ProcessEpoll.java:272)
    at com.zaxxer.nuprocess.internal.BaseEventProcessor.run(BaseEventProcessor.java:81)
    at com.zaxxer.nuprocess.linux.ProcessEpoll.run(ProcessEpoll.java:188)
    at java.lang.Thread.run(Thread.java:748)

"ProcessQueue0" is one of NuProcess's pump threads. NuProcess assigns them in round-robin fashion, so the async process always gets 0, then your "synchronous" processes (which actually aren't synchronous; more on that shortly) are started 1 on 1, 2 on 2, 3 on 3 and 4 on 0. Why 4 on 0? Well, usefully, you included your processor in your description. A Xeon 1270 is a 4 core/8 thread CPU. NuProcess's default thread count is "auto", which is the number of presented processors (8 because of SMT) divided by 2: 4. If you run your test with -Dcom.zaxxer.nuprocess.threads=10, you'll see that it actually completes because it never gets to a point where the round-robin resets back to the 0th thread.

When I said your processes aren't actually synchronous, it's because they're still pumped in the background on a NuProcess thread. All you're doing synchronously is blocking until the async thread completes. NuProcess 2.0.0 introduced NuProcessBuilder.run(), which actually does run the process synchronously. If you change your sync method to be like this:

    public static Single<Integer> sync(List<String> command, int id) {
        return Single.create(emitter -> {
            NuProcessBuilder builder = new NuProcessBuilder(command);
            builder.setProcessListener(new NuAbstractProcessHandler() {
                @Override
                public void onExit(int statusCode) {
                    emitter.onSuccess(statusCode);
                }
            });
            builder.run();
        });
    }

your test also passes.

However, the above is still running all of your subsequent processes directly on "ProcessQueue0". This means if anything else in the system tries to use NuProcessBuilder.start() to run a process while this is happening, it could "hang", if that process gets assigned to "ProcessQueue0", because you're still performing blocking processing directly on NuProcess thread.

I'm not that familiar with RxJava, but you need to find an approach that moves the follow-up processing off whatever thread calls emitter.onSuccess and onto some other thread, like an executor service (or the requesting thread, or any thread that doesn't belong to NuProcess).

As far as I can see, there's no issue with NuProcess here. The issue is that you can't block in a NuProcessHandler callback, which is explicitly documented on the interface:

Note that the processing thread that executes these callbacks on behalf of a NuProcess is the same thread that handles processing for other instances of NuProcess, and as a result you should never perform a blocking or time-consuming operation on the callback thread. Doing so will block or severely hinder the processing of other NuProcess instances. Any operation that requires more than single-digit millisecond processing should be handed off to another thread so that the processing thread can return to immediately service other NuProcess instances.

bturner commented 3 years ago

I used RxJava 3, but it shouldn't matter. While the stack trace on RxJava 2 is probably different, the problem will be the same. You can't block in a NuProcessHandler callback