swimos / swim

Full stack application platform for building stateful microservices, streaming APIs, and real-time UIs
https://www.swimos.org
Apache License 2.0
477 stars 41 forks source link

Long-running Stage tasks prevent SIGINT through SIGTERM type process termination #133

Open brohitbrose opened 10 months ago

brohitbrose commented 10 months ago

Perhaps worth noting, though, that the Swim port (9001 in the below example) does get released, so some sort of cleanup seems to be happening. Swim does have a shutdownhook so it is probable that the bug is in there.

Small (hopefully minimal) example:

// WastefulAgent.java
package nstream.starter;

import swim.kernel.Kernel;
import swim.server.ServerLoader;

public class WastefulAgent extends AbstractAgent {

  @Override
  public void didStart() {
    System.out.println(nodeUri() + ": didStart");
    while (true) {
      // You may wish to throttle this a bit
      System.out.println(nodeUri() + ": ping");
    }
  }

  public static void main(String[] args) {
    final Kernel kernel = ServerLoader.loadServer();
    kernel.start();
    System.out.println("Running Main ...");
    kernel.run();
  }

}
# server.recon

"starter": @fabric {
  @plane(class: "swim.api.plane.AbstractPlane")
  @node {
    uri: "/wasteful"
    @agent(class: "nstream.starter.WastefulAgent")
  }
}

@web(port: 9001) {
  space: "starter"
  @websocket {
    serverCompressionLevel: 0# -1 = default; 0 = off; 1-9 = deflate level
    clientCompressionLevel: 0# -1 = default; 0 = off; 1-9 = deflate level
  }
}

Running the main method results in an unstoppable process (outside of a kill -9 command). This happens whether the running method is an IDE, Gradle, or a plain old Java command. (Note that when running with Gradle, ctrl+C will seem to work, but the process is still visible w/ ps or top.)

Variations

asyncStage()

It's especially disturbing that moving the infinite loop to a proper asyncStage() call does not fix anything:

package nstream.starter;

import swim.api.agent.AbstractAgent;
import swim.concurrent.AbstractTask;

public class WastefulAgent extends AbstractAgent {

  @Override
  public void didStart() {
    System.out.println(nodeUri() + ": didStart");
    asyncStage().task(new AbstractTask() {
          @Override
          public void runTask() {
            while (true) {
              // You may wish to throttle this a bit
              System.out.println(nodeUri() + ": ping");
            }
          }

          @Override
          public boolean taskWillBlock() {
            return true;
          }
        })
        .cue();
  }

}
brohitbrose commented 10 months ago

swim.concurrent.Theater appears to contain the root cause of this problem. Theater#stop will hang if a currently-executing Task within it never completes.

The relevant block is:

          while (!this.pool.isTerminated()) {
            try {
              this.pool.awaitTermination(100, TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
              interrupted = true;
            }
          }

All this does is "while the pool hasn't shut down, blocking-check for up to 100ms if it hasn't shut down." A long-running task within the threadpool will forever make the check return false.

Perhaps the right logic is, "while the pool hasn't shut down, give each incomplete task within it up to X milliseconds to shut down". Or even, "while the pool hasn't shut down, force-shut-it-down." These may cause issues with persistence, though, and some tasks are surely "more important" than others (the Swim Clock escapes this already, we may need to add others if we go down this approach).

Note that it does not seem to be possible to cancel() a currently-running Task via its TaskRef (the invocation only returns true if the task is cueed; if the task is running, it is not cueed (ignoring "re-cues" of course, which are not pertinent to this discussion)). So even if were to get handles on specific tasks somehow within the stop() method for task disambiguation purposes (currently seems rather difficult), this doesn't make individual-task cancellation much easier.

Also note that the following code does NOT cause such the "non-terminable" situation; it does infinite loop, but it stops fine with ctrl+C. This is probably because SIGINT is used to "escape" the await; the problem in Swim is that SIGINT (eventually) triggers the above await logic.

  public static void main(String[] args) throws Exception {
    final ForkJoinPool pool = new ForkJoinPool(2);
    pool.submit(() -> { while (true) System.out.println("spin"); });
    pool.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
  }