micronaut-projects / micronaut-core

Micronaut Application Framework
http://micronaut.io
Apache License 2.0
6.03k stars 1.05k forks source link

Micronaut Client doesn't provide way to close keep alive connection when cancelled #2913

Open stepanv opened 4 years ago

stepanv commented 4 years ago

Based on my investigation, there is no way to instruct the Micronaut Client to close the connection (when keep alive is enabled) in case of

My use cases are as follows:

  1. My Micronaut Http Client sends a request to a "friendly" downstream service. It times out after 10 seconds, however the connection remains open and as a result the downstream service has no clue it can stop doing the computation. Furthermore, my Micronaut Http Client has to consume the incoming data. Which is unfortunate in case the downstream service sends back 100MB of data for instance. It consumes resources on both sides.
  2. My Micronaut Http Client aggregates results from several downstream services, if any of the downstream calls fails, I want to cancel all other ongoing downstream calls because I don't need the results anymore. The same applies as above, the responses from all the services have to be read. It again consumes resources on both sides.

Consider following 3 examples. All of them pass, but in all the cases, the client and the server must proceed with the computation because the connection is not closed.

Also enable http logging to see all the data are sent anyway (in logback.xml)

<logger name="io.micronaut.http" level="TRACE"/>
package io.micronaut.http.client

import io.micronaut.context.ApplicationContext
import io.micronaut.http.HttpRequest
import io.micronaut.http.annotation.Controller
import io.micronaut.http.annotation.Get
import io.micronaut.http.annotation.QueryValue
import io.micronaut.http.client.exceptions.ReadTimeoutException
import io.micronaut.runtime.server.EmbeddedServer
import io.reactivex.Flowable
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import spock.lang.Specification

import java.time.Duration
import java.util.concurrent.atomic.AtomicLong

class ReadCancelledSpec extends Specification {

    private static final Logger LOGGER = LoggerFactory.getLogger(ReadCancelledSpec);

    void "test http client resources are not utilized after client read timeout"() {
        given:"client with timeout 2s"

        ApplicationContext clientContext = ApplicationContext.run(
                'micronaut.http.client.read-timeout':'2s')
        def server = clientContext.getBean(EmbeddedServer).start()
        RxHttpClient client = clientContext.createBean(RxHttpClient, server.getURL())

        def nexts = []
        def errors = []

        when:"we send a request that lasts for 3 seconds"
        def result = client.retrieve(HttpRequest.GET('/slow?duration=3s&size=1000'), String)

        result.subscribe({nexts.add(it)}, {errors.add(it)})
        Thread.sleep(4_000)

        then:"timeout occurred"
        nexts.isEmpty()

        errors.size() == 1
        errors.every { it instanceof ReadTimeoutException }

        cleanup:
        client.close()
        clientContext.close()
    }

    void "test http client resources are not utilized when reactive result object is disposed"() {
        given:"no client read timeout"

        ApplicationContext clientContext = ApplicationContext.run(
                'micronaut.http.client.read-timeout':'-1s')
        def server = clientContext.getBean(EmbeddedServer).start()
        RxHttpClient client = clientContext.createBean(RxHttpClient, server.getURL())

        def nexts = []
        def errors = []

        when:"we call the endpoint that is slowly emitting the response"
        def result = client.retrieve(HttpRequest.GET('/slow?duration=4s&size=1000'), String)

        def disposable = result.subscribe({ nexts.add(it) }, { errors.add(it) })

        then:"wait for 2 seconds"
        Thread.sleep(2_000)

        when:"and we dispose the result"
        disposable.dispose()

        Thread.sleep(3_000)

        then:"no response or error occurred"
        nexts.isEmpty()
        errors.isEmpty()

        cleanup:
        client.close()
        clientContext.close()
    }

    void "test http streaming client resources are freed when reactive result object is disposed"() {
        given:"no client read timeout"

        ApplicationContext clientContext = ApplicationContext.run(
                'micronaut.http.client.read-timeout':'-1s')
        def server = clientContext.getBean(EmbeddedServer).start()
        RxStreamingHttpClient client = clientContext.createBean(RxStreamingHttpClient, server.getURL())

        def nexts = []
        def errors = []

        when:"we call the endpoint that is slowly emitting the response"
        def result = client.exchangeStream(HttpRequest.GET('/slow?duration=10s&size=1000'))

        def disposable = result.subscribe({ nexts.add(it.body.get().toByteArray()) }, 
                                          { errors.add(it) })

        then:"wait for 2 seconds"
        Thread.sleep(2_000)

        when:"and we dispose the result"
        disposable.dispose()

        Thread.sleep(15_000)

        then:"we got less than all the possibly emitted items because we disposed it"
        nexts.size() < 5
        nexts.every {
            new String(it).matches("^a{1000}\$")
        }
        errors.isEmpty()

        cleanup:
        client.close()
        clientContext.close()
    }

    @Controller("/")
    static class TestController {

        @Get("/slow")
        Flowable<byte[]> sleep(@QueryValue("duration") Duration duration, @QueryValue("size") int size) throws InterruptedException {

            AtomicLong sleptMillis = new AtomicLong(0)

            return Flowable.generate({ sink ->
                LOGGER.info("Sending item!")
                sink.onNext(String.join("", Collections.nCopies(size, "a")).bytes)
                try {
                    if (sleptMillis.get() < duration.toMillis()) {
                        Thread.sleep(1000)
                        sleptMillis.addAndGet(1000)
                    } else {
                        sink.onComplete()
                    }

                }
                catch (InterruptedException e) {
                    LOGGER.error("Interrupted!", e)
                    Thread.currentThread().interrupt()
                    sink.onError(new RuntimeException("shutting down", e))
                }
            })
        }
    }
}
stepanv commented 4 years ago

@graemerocher, I'm willing to contribute this feature as long as it's something you'd (or someone else from the authors would) be willing to guide, review and merge.

graemerocher commented 4 years ago

PRs welcome!

BartUK commented 3 years ago

A bit old thread here :) you mentioned when keep alive is enabled but based on the code you sent there is no keep-alive setting being configured. Am I missing something?

stepanv commented 3 years ago

@BartUK it's possible it's Netty's behavior (or Micronaut) to automatically use keep-alive. Try to use Micronaut Client and in the log you'd see keep alive is there:

2021-09-02 CEST 16:39:26,525 [default-nioEventLoopGroup-1-4] [] [tid=] [sid=] [] [] DEBUG io.micronaut.http.client.netty.DefaultHttpClient - Sending HTTP POST to http://localhost:56122/monitoring/v1/foo
2021-09-02 CEST 16:39:26,528 [default-nioEventLoopGroup-1-4] [] [tid=] [sid=] [] [] TRACE io.micronaut.http.client.netty.DefaultHttpClient - Content-Type: text/plain
2021-09-02 CEST 16:39:26,528 [default-nioEventLoopGroup-1-4] [] [tid=] [sid=] [] [] TRACE io.micronaut.http.client.netty.DefaultHttpClient - Accept: application/json,application/x-json-stream
2021-09-02 CEST 16:39:26,528 [default-nioEventLoopGroup-1-4] [] [tid=] [sid=] [] [] TRACE io.micronaut.http.client.netty.DefaultHttpClient - host: localhost:56122
2021-09-02 CEST 16:39:26,528 [default-nioEventLoopGroup-1-4] [] [tid=] [sid=] [] [] TRACE io.micronaut.http.client.netty.DefaultHttpClient - connection: keep-alive
2021-09-02 CEST 16:39:26,529 [default-nioEventLoopGroup-1-4] [] [tid=] [sid=] [] [] TRACE io.micronaut.http.client.netty.DefaultHttpClient - content-length: 99
2021-09-02 CEST 16:39:26,529 [default-nioEventLoopGroup-1-4] [] [tid=] [sid=] [] [] TRACE io.micronaut.http.client.netty.DefaultHttpClient - Request Body
2021-09-02 CEST 16:39:26,529 [default-nioEventLoopGroup-1-4] [] [tid=] [sid=] [] [] TRACE io.micronaut.http.client.netty.DefaultHttpClient - ----