reactor / reactor-netty

TCP/HTTP/UDP/QUIC client/server with Reactor over Netty
https://projectreactor.io
Apache License 2.0
2.61k stars 647 forks source link

CPU intesive work prevents LoopResources.disposeLater from running #3509

Closed philwebb closed 1 week ago

philwebb commented 2 weeks ago

Please see https://github.com/spring-projects/spring-boot/issues/21426 for background.

The following sample from @wilkinsona reproduces the issue:

@Test
public void testGracefulShutdown() throws Exception {
    CountDownLatch latch1 = new CountDownLatch(2);
    CountDownLatch latch2 = new CountDownLatch(2);
    LoopResources loop = LoopResources.create("testGracefulShutdown");
    this.disposableServer = HttpServer.create().port(0).runOn(loop).doOnConnection(c -> {
        c.onDispose().subscribe(null, null, latch2::countDown);
        latch1.countDown();
    })
        // Register a channel group, when invoking disposeNow()
        // the implementation will wait for the active requests to finish
        .channelGroup(new DefaultChannelGroup(new DefaultEventExecutor()))
        .route(r -> r
            .get("/delay500",
                    (req, res) -> res.sendString(Mono.just("delay500").delayElement(Duration.ofMillis(500))))
            .get("/cpuIntensive", (req, res) -> {
                // Simulate some long-running CPU-intensive work
                boolean stop = false;
                while (!stop) {
                }
                return res.sendString(Mono.just("cpuIntensive"));
            }))
        .wiretap(true)
        .bindNow(Duration.ofSeconds(30));

    HttpClient client = HttpClient.create().remoteAddress(this.disposableServer::address).wiretap(true);

    MonoProcessor<String> result = MonoProcessor.create();
    Flux.just("/delay500", "/cpuIntensive")
        .flatMap(s -> client.get().uri(s).responseContent().aggregate().asString())
        .collect(Collectors.joining())
        .subscribe(result);

    assertThat(latch1.await(30, TimeUnit.SECONDS)).isTrue();

    // Stop accepting incoming requests, wait at most 3s for the active requests to
    // finish
    try {
        this.disposableServer.disposeNow();
    }
    catch (IllegalStateException ex) {
        System.out.println(ex.getMessage());
        // The socket couldn't be stopped, continue with shutdown
    }

    System.out.println("Disposing of the loop resources");
    loop.disposeLater().block();
    System.out.println("Disposal complete");

    assertThat(latch2.await(30, TimeUnit.SECONDS)).isTrue();
}