akka / akka-http

The Streaming-first HTTP server/module of Akka
https://doc.akka.io/docs/akka-http
Other
1.34k stars 594 forks source link

Massive POST requests cause "akka.http.impl.engine.client.OutgoingConnectionBlueprint$UnexpectedConnectionClosureException" #2609

Closed ChanglinZhou closed 5 years ago

ChanglinZhou commented 5 years ago

When sending massive POST requests (~10M), the client got many exceptions "akka.http.impl.engine.client.OutgoingConnectionBlueprint$UnexpectedConnectionClosureException". (~400k),

I try to find out a workaround, My hunch is "retry". But it doesn't help, and I dug into the code, and found the code in PoolInterfaceActor.scala:

    val retries = if (pr.request.method.isIdempotent) hcps.setup.settings.maxRetries else 0

If I understand the behavior correctly, POST request won't retry, because it's not idempotent, even the failure is caused by server closing the connection.

What's the best practice for this scenario?

Thank you!

raboof commented 5 years ago

POST request won't retry, because it's not idempotent, even the failure is caused by server closing the connection

That's correct, we don't automatically retry POST requests since it's not possible to know (in general) whether a partially handled connection would have had a side effect.

If in your application you're sure a retry is safe, I think you can do it explicitly in your user code?

(this might have been better as a topic on https://discuss.akka.io)

ChanglinZhou commented 5 years ago

I think this is a bug, even the design regards POST not a idempotent operation, but we are in the situation which the request has not been sent out.

jrudolph commented 5 years ago

@ChanglinZhou, can you clarify how those requests are sent out? Is 10M the number of requests? How many of those run in parallel? What are your akka-http configuration settings? Which version do you use?

but we are in the situation which the request has not been sent out.

In recent versions, a request is only dispatched when the connection has been established. Also that exception seems to be only thrown after a request has been sent and while waiting for a response. Can you give some evidence that this is not the case? If it is possible (I guess it may be infeasible given the volume), could you enable DEBUG logging and share some logs from the pool?

ChanglinZhou commented 5 years ago

producer -> Alpakka stream -> map to http request-> akka http newHostConnectionPool

version:

compile group: 'com.typesafe.akka', name: 'akka-actor_2.12', version: '2.5.19'
compile group: 'com.typesafe.akka', name: 'akka-stream_2.12', version: '2.5.19'
compile group: 'com.typesafe.akka', name: 'akka-http_2.12', version: '10.1.6'
compile group: 'com.lightbend.akka', name: 'akka-stream-alpakka-amqp_2.12', version: '1.0-M1'
compile group: 'com.typesafe.akka', name: 'akka-stream-kafka_2.12', version: '0.22'
compile group: "com.typesafe.akka", name: "akka-slf4j_2.12", version: '2.5.19'

config:

akka { loglevel = "INFO" loggers = ["akka.event.slf4j.Slf4jLogger"] logging-filter = "akka.event.slf4j.Slf4jLoggingFilter" http.host-connection-pool { max-connections = 200 max-open-requests = 4096 // max-retries = 3 // AKKA http default value is 5, and it doesn't work for POST request response-entity-subscription-timeout = 15.seconds } http.client { idle-timeout = infinite } }

I will try to turn on the DEBUG log.

ChanglinZhou commented 5 years ago

The server code:

var http = require('http');

http.createServer(function (req, res) {
      console.log(req.url)
      res.writeHead(200, {'Content-Type': 'text/plain'});
      res.end("Hello World");
}).on('connection', function(socket) {
      socket.setTimeout(5000);
}).listen(3000);

The client test code:

package run.fast

import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.{HttpMethods, HttpRequest}
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Sink, Source}
import scala.concurrent.duration._
import scala.util.Success

object TestHttpPostStream extends App {
  implicit val system = ActorSystem("posting")

  implicit val mat = ActorMaterializer()
  val poolClientFlow = Http().cachedHostConnectionPool[Long]("remoteserver", 3000)

  def testGet() = {
    val stream = Source(Range(0, 30))
      .throttle(1, 3 seconds)
      .map((x: Int) => (HttpRequest(HttpMethods.GET, uri = s"http://remoteserver:3000/haha:$x")
        .withEntity(s"http://remoteserver:3000/haha:$x"), x.toLong))
      .via(poolClientFlow)
      .map({
        case (Success(r), b) =>
          println(s"success $b")
          r.entity.discardBytes()
        case _ =>
      })
      .runWith(Sink.ignore)
  }

  def testPost() = {
    val stream = Source(Range(0, 30))
      .throttle(1, 3 seconds)
      .map((x: Int) => (HttpRequest(HttpMethods.POST, uri = s"http://remoteserver:3000/haha:$x")
        .withEntity(s"http://remoteserver:3000/haha:$x"), x.toLong))
      .via(poolClientFlow)
      .map({
        case (Success(r), b) =>
          println(s"success $b")
          r.entity.discardBytes()
        case _ =>
      })
      .runWith(Sink.ignore)
  }

  testPost()
}

SBT file:

name := "scala_run_fast"

version := "0.1"

scalaVersion := "2.12.8"

val akkaHttpVersion = "10.1.9"

libraryDependencies ++= Seq (
  "com.microsoft.azure" % "azure-cosmosdb" % "2.5.1",
  "com.microsoft.azure" % "documentdb-bulkexecutor" % "2.5.0",
  ("com.typesafe.akka" %% "akka-http" % akkaHttpVersion),
  "com.typesafe.akka" %% "akka-actor" % "2.5.23",
  "com.typesafe.akka" %% "akka-stream" % "2.5.23",
  "com.netflix.eureka" % "eureka-client" % "1.9.12",
  "com.netflix.archaius" % "archaius-typesafe" % "0.7.6",
  "com.microsoft.azure" % "azure-documentdb" % "2.4.0")

On the server side, kill and start the process several times, I can reproduce the missing request.

It. seems Massive is not required.

jrudolph commented 5 years ago

@ChanglinZhou can you clarify what the expected behavior is and what you see? We cannot guarantee that every request will make it through if you kill and restart the server in the meantime.

ChanglinZhou commented 5 years ago

@jrudolph "kill and start" is just a convenient way to simulate failures. Today test team did another stress test, which can reproduce the failure, the server is in health state.

My solution is use another queue to retry the failed request, which makes the overall success rate > 99.9.

My suggestion is:

  1. given Akka Http regards post is not idempotent, it's better not to discard the inflight response request, the error message is "The http server closed the connection unexpectedly before delivering responses for 1 outstanding requests"
  2. it's better to let user decide whether the post request is idempotent.