akka / akka

Build highly concurrent, distributed, and resilient message-driven applications on the JVM
https://akka.io
Other
13.05k stars 3.59k forks source link

Akka Flow hangs when making multiple http requests via connection pool #20460

Closed unoexperto closed 8 years ago

unoexperto commented 8 years ago

I'm using Akka 2.4.4 and trying to move from Apache HttpAsyncClient (unsuccessfully).

Below is simplified version of code that I use in my project.

The problem is that it hangs if I send more than 1-3 requests to the flow. So far after 6 hours of debugging I couldn't even locate the problem. I don't see exceptions, error logs, events in Decider. NOTHING :)

I tried reducing connection-timeout setting to 1s thinking that maybe it's waiting for response from the server but it didn't help.

What am I doing wrong ?

import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.headers.Referer
import akka.http.scaladsl.model.{HttpRequest, HttpResponse}
import akka.http.scaladsl.settings.ConnectionPoolSettings
import akka.stream.Supervision.Decider
import akka.stream.scaladsl.{Sink, Source}
import akka.stream.{ActorAttributes, Supervision}
import com.typesafe.config.ConfigFactory

import scala.collection.immutable.{Seq => imSeq}
import scala.concurrent.{Await, Future}
import scala.concurrent.duration.Duration
import scala.util.Try

object Main {

  implicit val system = ActorSystem("root")
  implicit val executor = system.dispatcher
  val config = ConfigFactory.load()

  private val baseDomain = "www.google.com"
  private val poolClientFlow = Http()(system).cachedHostConnectionPool[Any](baseDomain, 80, ConnectionPoolSettings(config))

  private val decider: Decider = {
    case ex =>
      ex.printStackTrace()
      Supervision.Stop
  }

  private def sendMultipleRequests[T](items: Seq[(HttpRequest, T)]): Future[Seq[(Try[HttpResponse], T)]] =

    Source.fromIterator(() => items.toIterator)
      .via(poolClientFlow)
      .log("Logger")(log = myAdapter)
      .recoverWith {
        case ex =>
          println(ex)
          null
      }
      .withAttributes(ActorAttributes.supervisionStrategy(decider))
      .runWith(Sink.seq)
      .map { v =>
        println(s"Got ${v.length} responses in Flow")
        v.asInstanceOf[Seq[(Try[HttpResponse], T)]]
      }

  def main(args: Array[String]) {

    val headers = imSeq(Referer("https://www.google.com/"))
    val reqPair = HttpRequest(uri = "/intl/en/policies/privacy").withHeaders(headers) -> "some req ID"
    val requests = List.fill(10)(reqPair)
    val qwe = sendMultipleRequests(requests).map { case responses =>
      println(s"Got ${responses.length} responses")

      system.terminate()
    }

    Await.ready(system.whenTerminated, Duration.Inf)
  }
}

Also what's up with proxy support ? Doesn't seem to work for me either.

ktoso commented 8 years ago

You MUST consume the response.entity.dataBytes – currently you've opened connections and are not reading from them, thus back-pressure kicks in and you don't read from the connections, and they're left to die from an idle timeouts within a few seconds.

There is no proxy support yet – https://github.com/akka/akka/issues/16853

unoexperto commented 8 years ago

Sorry, I excluded part where I consume the response. Here is how it looks:

private def parseResponse[TR](response: Future[(Try[HttpResponse], RequestContext)], redirectCount: Int = 0)(implicit unmarshaller: FromEntityUnmarshaller[TR]): Future[TR] =
  response.flatMap { case (tryResp, reqContext) =>

    tryResp match {
      case Success(res) =>
        res.status match {
          case OK =>
            unmarshaller(res.entity).recoverWith {
              case ex =>
                Unmarshal(res.entity).to[String].flatMap { body =>
                  Future.failed(new IOException(s"Failed to unmarshal with ${ex.getMessage} and response body is\n $body"))
                }
            }
          case Found =>
            res.header[Location] match {
              case Some(value) =>
                if (redirectCount > 1)
                  Future.failed(throw new RuntimeException(s"Possible redirect loop? Redirect count is $redirectCount. Location is ${value.uri.toString()}"))
                else {
                  val newCookies = res.headers.filter(_.isInstanceOf[`Set-Cookie`]).map { v =>
                    val cookie = v.asInstanceOf[`Set-Cookie`].cookie
                    HttpCookiePair.apply(cookie.name, cookie.value)
                  }
                  parseResponse(createRequest(value.uri.toRelative, reqContext, imSeq(Cookie(newCookies))), redirectCount + 1)(unmarshaller)
                }

              case None =>
                Future.failed(new IOException(s"Got HTTP 302 response but Location header is missing"))
            }
          case _ =>
            Unmarshal(res.entity).to[String].flatMap { body =>
              Future.failed(new IOException(s"The response status is ${res.status} and response body is $body"))
            }
        }
      case Failure(ex) =>
        Future.failed(ex)
    }
  }

But it never gets called in Future[(Try[HttpResponse], T)].map{ }. I guess backpressure should kick in after number of responses exceeds request limit, right ?

And nothing dies due to timeout. I left code running for ~4 hours.

unoexperto commented 8 years ago

Any ideas what I'm doing incorrectly, Konrad ? Would you like me to create separate project with this code ?

Thanks!

ktoso commented 8 years ago

I'll not into it, but not right now, please be patient for a while – have some other urgent things in my hands.

ktoso commented 8 years ago

A self-container reproducer example app would help a lot to get into this quicker, thanks

unoexperto commented 8 years ago

Here we go https://github.com/cppexpert/akka_flow_freezing I added you to collaborators just in case.

I've changed code I little bit. Instead of using Sink.seq I switched to Source.queue. It seems more natural in terms of usage.

And this is output I get

[DEBUG] [05/06/2016 14:17:17.831] [main] [EventStream(akka://root)] logger log1-Logging$DefaultLogger started
[DEBUG] [05/06/2016 14:17:17.832] [main] [EventStream(akka://root)] Default Loggers started
[DEBUG] [05/06/2016 14:17:18.105] [main] [AkkaSSLConfig(akka://root)] Initializing AkkaSSLConfig extension...
[DEBUG] [05/06/2016 14:17:18.107] [main] [AkkaSSLConfig(akka://root)] buildHostnameVerifier: created hostname verifier: com.typesafe.sslconfig.ssl.DefaultHostnameVerifier@1ac85b0c
[DEBUG] [05/06/2016 14:17:18.406] [root-akka.actor.default-dispatcher-5] [akka://root/user/pool-master/PoolInterfaceActor-0] (Re-)starting host connection pool to www.zzzz.com:80
Request enqueued 2
Request enqueued 3
Request enqueued 1
Request enqueued 4
Request enqueued 5
Request enqueued 6
Request enqueued 7
Request enqueued 8
Request enqueued 9
[DEBUG] [05/06/2016 14:17:18.653] [root-akka.actor.default-dispatcher-2] [akka://root/user/SlotProcessor-1] become unconnected, from subscriber pending
[DEBUG] [05/06/2016 14:17:18.653] [root-akka.actor.default-dispatcher-9] [akka://root/user/SlotProcessor-2] become unconnected, from subscriber pending
[DEBUG] [05/06/2016 14:17:18.653] [root-akka.actor.default-dispatcher-8] [akka://root/user/SlotProcessor-0] become unconnected, from subscriber pending
[DEBUG] [05/06/2016 14:17:18.653] [root-akka.actor.default-dispatcher-7] [akka://root/user/SlotProcessor-3] become unconnected, from subscriber pending
[DEBUG] [05/06/2016 14:17:18.708] [root-akka.actor.default-dispatcher-15] [akka://root/system/IO-TCP/selectors/$a/0] Attempting connection to [www.zzz.com/172.227.100.160:80]
[DEBUG] [05/06/2016 14:17:18.712] [root-akka.actor.default-dispatcher-12] [akka://root/system/IO-TCP/selectors/$a/2] Attempting connection to [www.zzz.com/172.227.100.160:80]
[DEBUG] [05/06/2016 14:17:18.713] [root-akka.actor.default-dispatcher-16] [akka://root/system/IO-TCP/selectors/$a/1] Attempting connection to [www.zzz.com/172.227.100.160:80]
[DEBUG] [05/06/2016 14:17:18.713] [root-akka.actor.default-dispatcher-12] [akka://root/system/IO-TCP/selectors/$a/3] Attempting connection to [www.zzz.com/172.227.100.160:80]
[DEBUG] [05/06/2016 14:17:18.758] [root-akka.actor.default-dispatcher-7] [akka://root/system/IO-TCP/selectors/$a/0] Connection established to [www.zzz.com/172.227.100.160:80]
[DEBUG] [05/06/2016 14:17:18.758] [root-akka.actor.default-dispatcher-21] [akka://root/system/IO-TCP/selectors/$a/2] Connection established to [www.zzz.com/172.227.100.160:80]
[DEBUG] [05/06/2016 14:17:18.762] [root-akka.actor.default-dispatcher-23] [akka://root/system/IO-TCP/selectors/$a/3] Connection established to [www.zzz.com/172.227.100.160:80]
[DEBUG] [05/06/2016 14:17:18.762] [root-akka.actor.default-dispatcher-25] [akka://root/system/IO-TCP/selectors/$a/1] Connection established to [www.zzz.com/172.227.100.160:80]
Response was received 2
Response was received 3
Response was received 1
Response was received 4
lolski commented 8 years ago

Let me have a quick look, maybe I'll be able to find something

lolski commented 8 years ago

@cppexpert in your git example, you have not consumed response.entity.dataBytes. If you modified your function as below, it will work

private val poolClientFlow = initialize()
private val queue = Source.queue[(HttpRequest, (Any, Promise[(Try[HttpResponse], Any)]))](1000, OverflowStrategy.backpressure)
  .via(poolClientFlow)
  .toMat(Sink.foreach({
    case (triedResp, (value: Any, p: Promise[(Try[HttpResponse], Any)])) =>
      println(s"Response was received ${value.toString}")
      val x = triedResp.get.entity.dataBytes.toMat(Sink.seq)(Keep.right).run() // consume dataBytes, which is also a source
      x map { e => p.success(triedResp -> value); println(e) }
      p.success(triedResp -> value)
    case _ =>
      throw new RuntimeException()
  }))(Keep.left)
  .run
unoexperto commented 8 years ago

I've updated git with your code and also switched to HTTPS connection to avoid HTTP 302 response.

It's still not working although behavior has changed.

Could you please explain why you consume bytes in toMat if I'm supposed to consume data in parseResponse where response is unmarshalled using FromEntityUnmarshaller ? Doesn't it mean that other one or another will fail because there are no bytes in response stream ?

Here is output I get with new version of the code.

[DEBUG] [05/08/2016 14:11:18.555] [main] [EventStream(akka://root)] logger log1-Logging$DefaultLogger started
[DEBUG] [05/08/2016 14:11:18.556] [main] [EventStream(akka://root)] Default Loggers started
[DEBUG] [05/08/2016 14:11:18.755] [main] [AkkaSSLConfig(akka://root)] Initializing AkkaSSLConfig extension...
[DEBUG] [05/08/2016 14:11:18.757] [main] [AkkaSSLConfig(akka://root)] buildHostnameVerifier: created hostname verifier: com.typesafe.sslconfig.ssl.DefaultHostnameVerifier@548e76f1
[DEBUG] [05/08/2016 14:11:19.006] [root-akka.actor.default-dispatcher-4] [akka://root/user/pool-master/PoolInterfaceActor-0] (Re-)starting host connection pool to www.zerobin.net:443
Request enqueued 2
Request enqueued 1
Request enqueued 3
Request enqueued 5
Request enqueued 4
Request enqueued 6
Request enqueued 7
Request enqueued 8
Request enqueued 10
Request enqueued 9
[DEBUG] [05/08/2016 14:11:19.208] [root-akka.actor.default-dispatcher-10] [akka://root/user/SlotProcessor-0] become unconnected, from subscriber pending
[DEBUG] [05/08/2016 14:11:19.208] [root-akka.actor.default-dispatcher-15] [akka://root/user/SlotProcessor-2] become unconnected, from subscriber pending
[DEBUG] [05/08/2016 14:11:19.208] [root-akka.actor.default-dispatcher-9] [akka://root/user/SlotProcessor-1] become unconnected, from subscriber pending
[DEBUG] [05/08/2016 14:11:19.210] [root-akka.actor.default-dispatcher-3] [akka://root/user/SlotProcessor-3] become unconnected, from subscriber pending
[DEBUG] [05/08/2016 14:11:19.250] [root-akka.actor.default-dispatcher-12] [akka://root/system/IO-TCP/selectors/$a/1] Attempting connection to [www.zerobin.net/104.28.31.164:443]
[DEBUG] [05/08/2016 14:11:19.250] [root-akka.actor.default-dispatcher-7] [akka://root/system/IO-TCP/selectors/$a/0] Attempting connection to [www.zerobin.net/104.28.31.164:443]
[DEBUG] [05/08/2016 14:11:19.250] [root-akka.actor.default-dispatcher-13] [akka://root/system/IO-TCP/selectors/$a/3] Attempting connection to [www.zerobin.net/104.28.31.164:443]
[DEBUG] [05/08/2016 14:11:19.250] [root-akka.actor.default-dispatcher-4] [akka://root/system/IO-TCP/selectors/$a/2] Attempting connection to [www.zerobin.net/104.28.31.164:443]
[DEBUG] [05/08/2016 14:11:19.275] [root-akka.actor.default-dispatcher-7] [akka://root/system/IO-TCP/selectors/$a/2] Connection established to [www.zerobin.net/104.28.31.164:443]
[DEBUG] [05/08/2016 14:11:19.275] [root-akka.actor.default-dispatcher-12] [akka://root/system/IO-TCP/selectors/$a/0] Connection established to [www.zerobin.net/104.28.31.164:443]
[DEBUG] [05/08/2016 14:11:19.276] [root-akka.actor.default-dispatcher-4] [akka://root/system/IO-TCP/selectors/$a/1] Connection established to [www.zerobin.net/104.28.31.164:443]
[DEBUG] [05/08/2016 14:11:19.276] [root-akka.actor.default-dispatcher-24] [akka://root/system/IO-TCP/selectors/$a/3] Connection established to [www.zerobin.net/104.28.31.164:443]
[DEBUG] [05/08/2016 14:11:19.406] [root-akka.actor.default-dispatcher-20] [com.typesafe.sslconfig.ssl.DefaultHostnameVerifier] verify: hostname = www.zerobin.net, sessionId (base64) = QSsq+F7N9c5p/FQjecfSMSJUKmUK3PPvDjcXGF/2sVk=
[DEBUG] [05/08/2016 14:11:19.406] [root-akka.actor.default-dispatcher-11] [com.typesafe.sslconfig.ssl.DefaultHostnameVerifier] verify: hostname = www.zerobin.net, sessionId (base64) = ilTf02/PUDP4QzLfOkpHkeno0xNKd/YtzS+le9CqoPc=
[DEBUG] [05/08/2016 14:11:19.406] [root-akka.actor.default-dispatcher-3] [com.typesafe.sslconfig.ssl.DefaultHostnameVerifier] verify: hostname = www.zerobin.net, sessionId (base64) = g46Ogf5rF8JuhizuDg5jov4malxaFLlr0wTMVzRvHvc=
[DEBUG] [05/08/2016 14:11:19.406] [root-akka.actor.default-dispatcher-11] [com.typesafe.sslconfig.ssl.DefaultHostnameVerifier] verify: returning true
[DEBUG] [05/08/2016 14:11:19.406] [root-akka.actor.default-dispatcher-20] [com.typesafe.sslconfig.ssl.DefaultHostnameVerifier] verify: returning true
[DEBUG] [05/08/2016 14:11:19.407] [root-akka.actor.default-dispatcher-13] [com.typesafe.sslconfig.ssl.DefaultHostnameVerifier] verify: hostname = www.zerobin.net, sessionId (base64) = ZrfJp3AE3QytMET/WPb+XyRAPiEAPZPnaICVgEW+HbY=
[DEBUG] [05/08/2016 14:11:19.407] [root-akka.actor.default-dispatcher-13] [com.typesafe.sslconfig.ssl.DefaultHostnameVerifier] verify: returning true
[DEBUG] [05/08/2016 14:11:19.407] [root-akka.actor.default-dispatcher-3] [com.typesafe.sslconfig.ssl.DefaultHostnameVerifier] verify: returning true
[WARN] [05/08/2016 14:11:19.897] [root-akka.actor.default-dispatcher-13] [akka.actor.ActorSystemImpl(root)] Illegal response header: Illegal 'strict-transport-security' header: Invalid input 'p', expected WSP, CRLF or 'i' (line 1, column 19): max-age=15552000; preload
                  ^
[WARN] [05/08/2016 14:11:19.897] [root-akka.actor.default-dispatcher-24] [akka.actor.ActorSystemImpl(root)] Illegal response header: Illegal 'strict-transport-security' header: Invalid input 'p', expected WSP, CRLF or 'i' (line 1, column 19): max-age=15552000; preload
                  ^
[WARN] [05/08/2016 14:11:19.897] [root-akka.actor.default-dispatcher-15] [akka.actor.ActorSystemImpl(root)] Illegal response header: Illegal 'strict-transport-security' header: Invalid input 'p', expected WSP, CRLF or 'i' (line 1, column 19): max-age=15552000; preload
                  ^
Response was received 1
Response was received 3
Received 1 bytes for ID 1
Response was received 4
Received 1 bytes for ID 3
Received 1 bytes for ID 4
[WARN] [05/08/2016 14:11:19.948] [root-akka.actor.default-dispatcher-5] [akka.actor.ActorSystemImpl(root)] Illegal response header: Illegal 'strict-transport-security' header: Invalid input 'p', expected WSP, CRLF or 'i' (line 1, column 19): max-age=15552000; preload
                  ^
Response was received 2
Received 1 bytes for ID 2
Response was received 6
Received 1 bytes for ID 6
Response was received 5
Received 1 bytes for ID 5
Response was received 9
Response was received 10
Received 1 bytes for ID 9
Received 1 bytes for ID 10
Response was received 7
Response was received 8
Received 1 bytes for ID 7
Received 1 bytes for ID 8
Got 10 responses
Parsing items ID 1
Parsing items ID 2
Parsing items ID 3
Parsing items ID 4
Parsing items ID 5
Parsing items ID 6
Parsing items ID 7
Parsing items ID 8
Parsing items ID 9
Parsing items ID 10
[ERROR] [05/08/2016 14:11:20.467] [root-akka.actor.default-dispatcher-21] [akka://root/user/StreamSupervisor-0/flow-26-0-unknown-operation] Error in stage [akka.http.impl.util.StreamUtils$$anon$2@490bf2d9]: Promise already completed.
java.lang.IllegalStateException: Promise already completed.
    at scala.concurrent.Promise$class.complete(Promise.scala:55)
    at scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:153)

There are several weird things here:

  1. It never reaches point of testCall2() where it prints out Got ${newsHtml.toString.length} characters in html for index $index. Thus my code doesn't parse the response.
  2. map in val queue = Source.queue prints that only 1 bytes is received. Does it mean somebody has already consumed the response ?
  3. I get Promise already completed exception for each request although I don't see p.success being called more than once.
lolski commented 8 years ago

@cppexpert you are right, I oversimplified the example and commented out the parsing part completely and hence why it worked for me. The workaround for the issue seems to be to convert the entity to its strict counterpart: val en = Await.result(res.entity.toStrict(10 seconds), 10 seconds).

Since I am not a collaborator, I have created a PR (https://github.com/cppexpert/akka_flow_freezing/pull/1) with the workaround to your repo. please let us know if it works. If it does, then we have to check why the unmarshaller does not work properly on a non-strict HttpResponse.

cc: @ktoso

unoexperto commented 8 years ago

Thank you for you help, @lolski. I'm afraid having blocking code is not a solution for me. Let's see what more "reactive" way maintainer will offer.

Another workaround I see is to change Source.queue definition to

  private type RequestParamType = (Try[HttpResponse] => Try[Any], Any, Promise[(Try[Any], Any)])
  private val queue = Source.queue[(HttpRequest, RequestParamType)](1000, OverflowStrategy.backpressure)
    .via(poolClientFlow)
    .toMat(Sink.foreach({
      case (triedResp, (parser : (Try[HttpResponse] => Try[Any]), value : Any, p : Promise[(Try[Any], Any)]) ) =>
        println(s"Response was received ${value.toString}")
        p.success(parser(triedResp) -> value)
      case _ =>
        throw new RuntimeException()
    }))(Keep.left)
    .run

but my god it's even uglier than what I have with Apache HttpClient. Passing promise AND non-typesafe converter for HttpResponse looks awful.

lolski commented 8 years ago

@cppexpert It doesn't mean we have to block using Await.result. That was just an example. toStrict returns a Future, so this will work too:

val enFut = res.entity.toStrict(10 seconds)
enFut map { ... }

@ktoso could this be a bug with the unmarshalling process? I can investigate deeper if you think it is

ktoso commented 8 years ago

Ticket moved to github.com/akka/akka-http
Rationale for the move discussed here: akka/akka-meta#27

If you are interested in contributing or tracking this issue, please comment in akka-http instead from now on :-)