akka / akka-http

The Streaming-first HTTP server/module of Akka
https://doc.akka.io/docs/akka-http
Other
1.34k stars 594 forks source link

Change `max-open-requests` to be inclusive #3805

Open harpocrates opened 3 years ago

harpocrates commented 3 years ago

When using the streaming connection pool client API, I usually don't care much about max-open-requests since max-connections is going to be the determining factor for the number of concurrent connections. However, max-open-requests still has to be set to a number that is greater than max-connections. Combined with the fact that max-open-requests has to be a power of two, the configuration ends up being a little unintuitive.

I wish the following would work instead of throwing a BufferOverflowException:

import $ivy.`com.typesafe.akka::akka-actor:2.6.14`
import $ivy.`com.typesafe.akka::akka-stream:2.6.14`
import $ivy.`com.typesafe.akka::akka-http:10.2.4`

import akka.actor.ActorSystem
import akka.http.scaladsl._
import akka.stream.scaladsl.{Sink, Source}
import akka.http.scaladsl.model.HttpRequest
import java.util.concurrent.atomic.AtomicLong
import scala.util.{Failure, Success}
import akka.http.scaladsl.settings.ConnectionPoolSettings

implicit val system = ActorSystem()

val pending = new AtomicLong()
val MaxConnections = 64
val connectionPoolSettings: ConnectionPoolSettings = ConnectionPoolSettings.default
  .withMaxConnections(MaxConnections)
  .withMaxOpenRequests(MaxConnections)

Source(0 to 200)
  .map { x: Int =>
    println(s"Pending: ${pending.incrementAndGet()}")  // This always prints a number in the 1-64 range (inclusive)
    HttpRequest(uri = s"https://www.google.com/search?q=$x") -> x
  }
  .via(
    Http().newHostConnectionPoolHttps[Int](
      host = "www.google.com",
      settings = connectionPoolSettings
    )
  )
  .map { case (outcome, _) =>
    pending.decrementAndGet()
    outcome match {
      case Failure(err) => println(err)
      case Success(r) => r.discardEntityBytes()
    }
  }
  .to(Sink.ignore)
  .run()

Instead, to make this not throw, I have to change the withMaxOpenRequests to

  .withMaxOpenRequests(MaxConnections * 2) // just need more than `MaxConnections`, and also a power of 2
jrudolph commented 3 years ago

I agree that is awkward.

In fact, both the power of two requirement and any dependency between an max-connections and max-open-requests is not really warranted any more.

I guess the easiest solution for newHostConnectionPool and cachedHostConnectionPool would be to actually implement parts of #2120, i.e. providing an actual streaming interface without the current clutch which uses a mixture of mapAsyncUnordered and explicit buffers to provide backpressure. The current solution can lead to issues like this one, where the buffers in mapAsyncUnordered and the PoolInterface might not quite agree on the current state.