http4s / blaze

Blazing fast NIO microframework and Http Parser
Apache License 2.0
351 stars 63 forks source link

`IllegalStateException`: supervisor already shutdown #894

Open ShahOdin opened 3 months ago

ShahOdin commented 3 months ago

As originally raised in https://github.com/http4s/http4s/discussions/7502

The following code:

Click to expand ```scala import cats.effect.{IO, IOApp} import cats.effect.IO.asyncForIO import cats.effect.kernel.Resource import org.http4s.blaze.client.BlazeClientBuilder import org.http4s.client.Client import scala.concurrent.duration.* object Demo extends IOApp.Simple: val clientResource: Resource[IO, Client[IO]] = BlazeClientBuilder[IO].resource def query: IO[Int] = clientResource.use( _ .statusFromString(s"https://postman-echo.com/get") .map(_.code) .flatTap(s => IO.println(s"request: returned with status: $s")) ) override def run: IO[Unit] = fs2.Stream .range(1, 100) .metered(1.seconds) .evalMap(_ => query) .compile .drain ```

throws the following logs:

java.lang.IllegalStateException: supervisor already shutdown at get @ fs2.internal.Scope.openScope(Scope.scala:275) at get @ fs2.internal.Scope.openScope(Scope.scala:275) at unique @ fs2.Compiler$Target$ConcurrentTarget.unique(Compiler.scala:194)

Note that you also see after each request:

INFO org.http4s.blaze.client.PoolManager - Shutting down connection pool: curAllocated=1 idleQueues.size=1 waitQueue.size=0 maxWaitQueueLimit=256 closed=false

if I change the code to:

Click to expand ```scala import cats.effect.{IO, IOApp, Resource} import cats.effect.IO.asyncForIO import org.http4s.blaze.client.BlazeClientBuilder import org.http4s.client.Client import scala.concurrent.duration.* object Demo extends IOApp.Simple: val clientResource: Resource[IO, Client[IO]] = BlazeClientBuilder[IO].resource def query(client: Client[IO]): IO[Int] = client .statusFromString(s"https://postman-echo.com/get") .map(_.code) .flatTap(s => IO.println(s"request: returned with status: $s")) override def run: IO[Unit] = clientResource.use(c => fs2.Stream .range(1, 100) .metered(1.seconds) .evalMap(_ => query(c)) .compile .drain ) ```

ie,

-    fs2.Stream
-      .range(1, 100)
-      .metered(1.seconds)
-      .evalMap(_ => query)
-      .compile
-      .drain
+    clientResource.use(c =>
+      fs2.Stream
+        .range(1, 100)
+        .metered(1.seconds)
+        .evalMap(_ => query(c))
+        .compile
+        .drain
+    )

The problem goes away. I also checked this with the ember client and not seeing the issue there:

Click to expand ```scala import cats.effect.{IO, IOApp, Resource} import cats.effect.IO.asyncForIO import org.http4s.ember.client.EmberClientBuilder import org.http4s.client.Client import scala.concurrent.duration.* object Demo extends IOApp.Simple: val clientResource: Resource[IO, Client[IO]] = EmberClientBuilder.default[IO].build def query: IO[Int] = clientResource.use( _ .statusFromString(s"https://postman-echo.com/get") .map(_.code) .flatTap(s => IO.println(s"request: returned with status: $s")) ) override def run: IO[Unit] = fs2.Stream .range(1, 100) .metered(1.seconds) .evalMap(_ => query) .compile .drain ```