akka / akka-http

The Streaming-first HTTP server/module of Akka
https://doc.akka.io/libraries/akka-http/current/
Other
1.34k stars 594 forks source link

akka http server hang! #2513

Closed kkang2 closed 5 years ago

kkang2 commented 5 years ago

== use java ==

The overall structure is shown below.

Sometimes it is different, but.. The network status is "close-wait" within an hour and the server is not able to receive the client request.

Finally, only L4 health check requests remain. tcp 0 0 serverip:29999 LP_HEALTH_CHECK_INNER_IP:20028 SYN_RECV
tcp 0 0 serverip:29999 LP_HEALTH_CHECK_INNER_IP:35298 SYN_RECV
tcp 0 0 serverip:29999 LP_HEALTH_CHECK_INNER_IP:34611 SYN_RECV
tcp 0 0 serverip:29999 LP_HEALTH_CHECK_INNER_IP:21245 SYN_RECV
tcp 0 0 serverip:29999 LP_HEALTH_CHECK_INNER_IP:35338 SYN_RECV
tcp 0 0 serverip:29999 LP_HEALTH_CHECK_INNER_IP:21588 SYN_RECV
tcp 0 0 serverip:29999 LP_HEALTH_CHECK_INNER_IP:24681 SYN_RECV

== server code == path( PathMatchers.segment("api").slash("something").slash("something"), () -> post(() -> entity(Jackson.unmarshaller(DangerousSituationWarningDomain.class), domain -> extractRequest (req ->
extractScheme(scheme -> extractClientIP(remoteAddr -> { log.info("header info : {}", req.getHeaders()); return complete(handler.execute(domain)); //<= return immediately(async) }) ) ) ) ) ),

The server internal logic is then synchronized using the akka http client. It has two external http requests. <= many requests

== code == CompletionStage stage = http.singleRequest(HttpRequest.create(reqUrl) .withMethod(HttpMethods.POST) .withEntity(ContentTypes.APPLICATION_JSON, ByteString.fromString(paramJson)));

CompletableFuture future = stage.toCompletableFuture(); ........

== akka http config == akka { loglevel = "DEBUG" loggers = ["akka.event.slf4j.Slf4jLogger"] logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"

stdout-loglevel = "DEBUG"

http {
    server {
        remote-address-header = on
    }   
    client {
        connecting-timeout = 5s
    }   
}   

host-connection-pool {
    # The maximum number of parallel connections that a connection pool to a
    # single host endpoint is allowed to establish. Must be greater than zero.
    max-connections = 12

    # The maximum number of open requests accepted into the pool across all 
    # materializations of any of its client flows.
    # Protects against (accidentally) overloading a single pool with too many client flow materializations.
    # Note that with N concurrent materializations the max number of open request in the pool
    # will never exceed N * max-connections * pipelining-limit.
    # Must be a power of 2 and > 0!
    max-open-requests = 64
}   

}

Clients using URLConnection set up keep-alive headers. As a result of testing, client have used one connection for more than 40 minutes.

The server and client are on different networks, and when I used the akka http server, the client was always on the same network.

This is the first time I've experienced this situation.

How can I solve this situation?

jrudolph commented 5 years ago

Hi @kkang2, thanks for the report.

The network status is "close-wait" within an hour and the server is not able to receive the client request.

What are the error messages on the client in that case? Is it not able to open a TCP connection any more? Or can it open a TCP connection and HTTP requests are not responded to any more? Or is it on an existing keep-alive connection that new requests are not answered any more?

If you get into such a situation, can you post the output of connection to the service with curl -v <url>? A tcpdump would even be better.

Can you also check what the server application is doing in such a case by using jstack <pid> when it happens?

kkang2 commented 5 years ago
  1. client error stacktrace
java.net.SocketTimeoutException: Read timed out
        at java.net.SocketInputStream.socketRead0(Native Method)
        at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
        at java.net.SocketInputStream.read(SocketInputStream.java:171)
        at java.net.SocketInputStream.read(SocketInputStream.java:141)
        at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
        at java.io.BufferedInputStream.read1(BufferedInputStream.java:286)
        at java.io.BufferedInputStream.read(BufferedInputStream.java:345)
        at sun.net.www.http.HttpClient.parseHTTPHeader(HttpClient.java:735)
        at sun.net.www.http.HttpClient.parseHTTP(HttpClient.java:678)
        at sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1587)
        at sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1492)
        at Test.getHttpByURLConnection(Test.java:187)
        at Test.main(Test.java:198)
  1. curl : It keeps stopping.
curl -v http://L4_IP:29999/api/vse/warning -H "Content-Type:application/json;charset=utf-8" -d '{
>     "header" : {
>         "interfaceId" : "IF1001",
>         "cnt" : 1
>     },
>     "body" : [
>         {
>             "centerId" : "CT0000000001",
>             "centerName" : "test",
>             "eventId" : "EI0000111122",
>             "eventType" : 1286,
>             "eventTime" : "20190425231020"
>         }
>     ]
> }'
* About to connect() to L4_IP port 29999 (#0)
*   Trying L4_IP...
* Connected to L4_IP (L4_IP) port 29999 (#0)
> POST /api/vse/warning HTTP/1.1
> User-Agent: curl/7.29.0
> Host: L4_IP:29999
> Accept: */*
> Content-Type:application/json;charset=utf-8
> Content-Length: 979
> 
* upload completely sent off: 979 out of 979 bytes
=== after 20 minutes ===
* Recv failure: Connection reset by peer
* Closing connection 0
curl: (56) Recv failure: Connection reset by peer
  1. telnet : server is alive..

telnet server1-IP 29999 Trying server1-IP... Connected to server1-IP. Escape character is '^]'.



4. tcpdump and jstack is analyzing.. I will post it later.
kkang2 commented 5 years ago

The cause has been identified. The jstack check resulted in a problem with the mqtt call. The default value of timeToWait of MqttClient is -1, which is set to 5000 (5s), so it is not blocked. -1 means infinite wait

Thank you for caring me. ^^

Thread 80566: (state = BLOCKED)
 - java.lang.Object.wait(long) @bci=0 (Compiled frame; information may be imprecise)
 - java.lang.Object.wait() @bci=2, line=502 (Compiled frame)
 - **org.eclipse.paho.client.mqttv3.internal.Token.waitForResponse(long) @bci=163, line=143 (Compiled frame)**
 - org.eclipse.paho.client.mqttv3.internal.Token.waitForCompletion(long) @bci=43, line=108 (Compiled frame)
 - org.eclipse.paho.client.mqttv3.MqttToken.waitForCompletion(long) @bci=5, line=67 (Compiled frame)
 - org.eclipse.paho.client.mqttv3.MqttClient.publish(java.lang.String, org.eclipse.paho.client.mqttv3.MqttMessage) @bci=15, line=583 (Compiled frame)
 - com.xxx.xxx.kec.connector.MqttConnector.publishMsgFromArray(java.util.List, java.lang.String) @bci=100, line=71 (Compiled frame)
 - com.xxx.xxx.kec.actor.EtcActor.execute(com.skt.cits.kec.domain.SoftShoulderStopVehicleWarningDomain) @bci=474, line=287 (Compiled frame)
 - com.xxx.xxx.kec.actor.EtcActor.lambda$createReceive$2(com.skt.cits.kec.domain.SoftShoulderStopVehicleWarningDomain) @bci=12, line=63 (Compiled frame)
 - com.xxx.xxx.kec.actor.EtcActor$$Lambda$554.apply(java.lang.Object) @bci=8 (Compiled frame)
 - akka.japi.pf.UnitCaseStatement.apply(java.lang.Object) @bci=5, line=24 (Compiled frame)
 - akka.japi.pf.UnitCaseStatement.apply(java.lang.Object) @bci=2, line=20 (Compiled frame)
 - scala.PartialFunction.applyOrElse(java.lang.Object, scala.Function1) @bci=12, line=127 (Compiled frame)
 - scala.PartialFunction.applyOrElse$(scala.PartialFunction, java.lang.Object, scala.Function1) @bci=3, line=126 (Compiled frame)
 - akka.japi.pf.UnitCaseStatement.applyOrElse(java.lang.Object, scala.Function1) @bci=3, line=20 (Compiled frame)
 - scala.PartialFunction$OrElse.applyOrElse(java.lang.Object, scala.Function1) @bci=11, line=175 (Compiled frame)
 - scala.PartialFunction$OrElse.applyOrElse(java.lang.Object, scala.Function1) @bci=35, line=176 (Compiled frame)
 - scala.PartialFunction$OrElse.applyOrElse(java.lang.Object, scala.Function1) @bci=35, line=176 (Compiled frame)
 - akka.actor.Actor.aroundReceive(scala.PartialFunction, java.lang.Object) @bci=8, line=539 (Compiled frame)
 - akka.actor.Actor.aroundReceive$(akka.actor.Actor, scala.PartialFunction, java.lang.Object) @bci=3, line=537 (Compiled frame)
 - akka.actor.AbstractActor.aroundReceive(scala.PartialFunction, java.lang.Object) @bci=3, line=226 (Compiled frame)
 - akka.actor.ActorCell.receiveMessage(java.lang.Object) @bci=15, line=610 (Compiled frame)
 - akka.actor.ActorCell.invoke(akka.dispatch.Envelope) @bci=64, line=579 (Compiled frame)
 - akka.dispatch.Mailbox.processMailbox(int, long) @bci=24, line=268 (Compiled frame)
 - akka.dispatch.Mailbox.run() @bci=20, line=229 (Compiled frame)
 - akka.dispatch.Mailbox.exec() @bci=1, line=241 (Compiled frame)
 - akka.dispatch.forkjoin.ForkJoinTask.doExec() @bci=10, line=260 (Compiled frame)
 - akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(akka.dispatch.forkjoin.ForkJoinTask) @bci=10, line=1339 (Compiled frame)
 - akka.dispatch.forkjoin.ForkJoinPool.runWorker(akka.dispatch.forkjoin.ForkJoinPool$WorkQueue) @bci=11, line=1979 (Compiled frame)
 - akka.dispatch.forkjoin.ForkJoinWorkerThread.run() @bci=14, line=107 (Interpreted frame)
jrudolph commented 5 years ago

Cool, glad you figured it out.

You might want to have a look at the Alpakka MQTT connector which already implements all the nitty-gritty asynchronous bits.