typelevel / skunk

A data access library for Scala + Postgres.
https://typelevel.org/skunk/
MIT License
1.59k stars 163 forks source link

Broken Pipe Exception from Pool #524

Open channingwalton opened 3 years ago

channingwalton commented 3 years ago

I see this exception that seems related to Skunk, which happens randomly and not even when there is any interaction with the DB.

There was a suggestion it is a connection loss which I'm investigating.

java.io.IOException: Broken pipe
at apply @ skunk.util.Pool$.poolImpl$1(Pool.scala:80)
at java.base/sun.nio.ch.EPollPort$EventHandlerTask.run(EPollPort.java:306)
at flatMap @ fs2.io.net.SocketCompanionPlatform$AsyncSocket.go$2(SocketPlatform.scala:132)
at java.base/sun.nio.ch.UnixAsynchronousSocketChannelImpl.onEvent(UnixAsynchronousSocketChannelImpl.java:213)
at acquireN @ fs2.Stream.waitN$1(Stream.scala:1392)
at java.base/sun.nio.ch.AsynchronousChannelGroupImpl$1.run(AsynchronousChannelGroupImpl.java:112)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/sun.nio.ch.FileDispatcherImpl.write0(Native Method)
at java.base/sun.nio.ch.IOUtil.write(IOUtil.java:50)
at java.base/sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:113)
at java.base/sun.nio.ch.IOUtil.write(IOUtil.java:79)
at java.base/java.lang.Thread.run(Thread.java:829)
at async_ @ fs2.io.net.SocketCompanionPlatform$AsyncSocket.go$2(SocketPlatform.scala:126)
sebastianvoss commented 2 years ago

@channingwalton I'm currently seeing similar errors. Did you figure out the root cause?

channingwalton commented 2 years ago

@channingwalton I'm currently seeing similar errors. Did you figure out the root cause?

Hi, no unfortunately I didn’t have time. We’ve switched to ember which doesn’t suffer the same issue.

ahjohannessen commented 2 years ago

I am also getting: java.io.IOException: Broken pipe and skunk.exception.EofException: EOF was reached on the network socket. - Using Session.pooled(max = 5) - This is using consul connect with envoy for mTLS.

It only happens after a while, and hitting the same endpoint a couple of times makes thing work again. So, I suppose it is some kind of timeout issue.

@sebastianvoss Did you figure out what was wrong in your case?

sebastianvoss commented 2 years ago

@ahjohannessen The root cause are connections which get into an invalid state (loosing connectivity to the DB or being closed by Postgres after idle timeout). Those connections remain in the pool and won't be removed as the health check only runs when returning a connection to the pool. We solved this for our specific use case but my plan is also to look into a more generic solution to contribute it.

wvandermerwe commented 2 years ago

@sebastianvoss How did you manage to work around this issue?

fdietze commented 2 years ago

Here's how I'm currently working around the problem in ScalaJS / AWS lambda: I only use skunk for typescafe query construction, but submit them using pg-node/pg-connection-string (with scalablytyped). It also gave me a massive performance boost.

import skunk._

import scala.async.Async.{async, await}
import scala.concurrent.{ExecutionContext, Future}
import scala.scalajs.js
import scala.scalajs.js.annotation.JSImport
import scala.annotation.nowarn
import typings.pg.mod.ClientConfig
import typings.pg.mod.QueryArrayConfig
import typings.pg.mod.{Client => PgClient}
import cats.effect.std.Semaphore
import cats.effect.IO
import cats.implicits._

import cats.effect.unsafe.implicits.{global => unsafeIORuntimeGlobal}

import scala.scalajs.js.JSConverters._
import scala.util.{Failure, Success}

import skunk.implicits._

@js.native
@JSImport("pg-connection-string", JSImport.Namespace)
@nowarn("msg=never used")
object PgConnectionString extends js.Object {
  def parse(arg: String): ClientConfig = js.native
}

class PostgresClient(connectionString: String)(implicit ec: ExecutionContext) {

  private val client: PgClient              = new PgClient(PgConnectionString.parse(connectionString))
  private lazy val connection: Future[Unit] = client.connect().toFuture

  val transactionSemaphore: Future[Semaphore[IO]] = Semaphore[IO](1).unsafeToFuture()

  def command[PARAMS](
    command: Command[PARAMS],
    params: PARAMS = Void,
  ): Future[Unit] = async {
    await(connection)
    await(
      client
        .query(
          command.sql,
          command.encoder.encode(params).map(_.orNull).toJSArray,
        )
        .toFuture,
    )
    ()
  }

  def query[PARAMS, ROW](
    query: Query[PARAMS, ROW],
    params: PARAMS = Void,
  ): Future[Vector[ROW]] = async {

    await(connection) // wait until connection is ready
    val result = await(
      client
        .query[js.Array[js.Any], js.Array[js.Any]](
          QueryArrayConfig[js.Array[js.Any]](query.sql),
          query.encoder.encode(params).map(_.orNull.asInstanceOf[js.Any]).toJSArray,
        )
        .toFuture,
    )
    result.rows.view.map { row =>
      query.decoder.decode(
        0,
        row.view
          .map(any =>
            Option(any: Any).map {
              // TODO: tests for these data types
              case bool: Boolean => if (bool) "t" else "f"
              case date: js.Date =>
                // toString on js.Date localizes the date. `toISOString`
                // also adds time information that doesn't exist in the original
                // `Date` object in the database. This overhead gets cut away by
                // calling `substring`.
                date.toISOString().substring(0, 10)
              case other => other.toString
            },
          )
          .toList,
      ) match {
        case Left(err)         => throw new Exception(err.message)
        case Right(decodedRow) => decodedRow
      }
    }.toVector
  }

  def querySingleRow[PARAMS, ROW](queryFragment: Query[PARAMS, ROW], params: PARAMS = Void): Future[ROW] = async {
    val rows = await(query[PARAMS, ROW](queryFragment, params))
    if (rows.isEmpty) throw new RuntimeException("Requested single row, but got no rows.")
    rows.head
  }

  @nowarn("msg=dead code")
  def tx[T](code: => Future[T]) = async {
    await(connection) // wait until connection is ready
    val semaphore: Semaphore[IO] = await(transactionSemaphore)

    await(semaphore.acquire.unsafeToFuture()) // wait until other transaction has finished
    await(command(sql"BEGIN".command))        // begin transaction

    await(code.transformWith {
      case Success(result) =>
        async {
          val committed = await(command(sql"COMMIT".command).attempt)
          await(semaphore.release.unsafeToFuture())
          committed match {
            case Right(_)  => result
            case Left(err) => throw err
          }
        }
      case Failure(exception) =>
        async {
          println(s"Transaction failed. Rolling back.")
          val rolledBack = await(command(sql"ROLLBACK".command).attempt)
          await(semaphore.release.unsafeToFuture())
          rolledBack match {
            case Right(_) => throw exception
            case Left(err) =>
              val finalException = new Exception("ROLLBACK FAILED")
              finalException.addSuppressed(err)
              finalException.addSuppressed(exception)

              throw finalException
          }
        }
    })
  }
}
pmsfc commented 1 year ago

@tpolecat This continues to happen in the last version (0.6.0) when using an RDS proxy. The proxy will apply an IdleClientTimeout to every connection (default 30mins), so if we have a pool slightly bigger than we need some sessions will be in idle for more than 30 mins and when used will throw skunk.exception.EofException.

Right now we changed our approach to a lower pool size and bumped the rds proxy timeout to 4h, this will make it happen way less often but still feels hacky.

Is there a way to apply health checks to idle sessions or recover easily from this exception?

armanbilge commented 1 year ago

Ember Client had a similar problem with connections going stale, here's how we fixed it:

yilinwei commented 1 year ago

@pmsfc

It seems to be a common enough issue that libpq has a keep alive parameter associated with it, which it sets on the underlying socket.

fs2-io does have support for the option under .keepAlive, which you can pass in during creation of the pooled session.

pmsfc commented 1 year ago

Thanks! I'll try it and comeback here with the results

pmsfc commented 1 year ago

I'm still seeing issues with the keep alive set to true socketOptions = List(SocketOption.keepAlive(true)). I went back to the default 30 mins idle timeout on the rds proxy and after +/- 30 minutes the errors appeared.

pmsfc commented 1 year ago

We moved to Doobie, hopefully no more of these on jdbc land

TonioGela commented 10 months ago

@pmsfc

It seems to be a common enough issue that libpq has a keep alive parameter associated with it, which it sets on the underlying socket.

fs2-io does have support for the option under .keepAlive, which you can pass in during creation of the pooled session.

Won't this fix the issue only when the connection CAN be kept alive? At the moment, I'm experiencing it since my Postgres instance, if untouched for at least 30 minutes, gets "garbage collected" by the infrastructure, and it spins up again whenever you attempt a new query/command. In this case, the keepAlive parameter is useless, as the connection can't be kept alive (the target doesn't exist in network terms). Also, I want to point out that calling a second time the corresponding GET API that performs that postgres call ALWAYS works. Probably something useful would be having some kind of health check that (when the db is not reachable, closes the session/resource). I haven't taken a look at the source tbf, it's all theoretical.

[EDIT] The easy fix I have in place is ofc a retry that fixes this issue 99.9% of the time [EDIT 2] I've looked at the code, and Recyclers should serve this purpose, so I can't tell where the problem is coming from.

ThatScalaGuy commented 4 months ago

I encountered the issue when running my application and database in a Docker Swarm environment. After precisely 30 minutes of inactivity, connections are terminated but remain in the connection pool. Interestingly, this problem does not occur when running the application natively with the database in Docker on my development machine.

The exact point in the network stack where the connection termination occurs remains unclear. To mitigate this issue, we I implemented two potential solutions:

1. Custom Connection Pool Implementation: I developed a custom implementation of the ConnectionPool that periodically checks and validates the connections available in the pool.

2. Idle Connection Termination: A more efficient approach involves terminating idle connections after a specified period. This method is likely to be more effective and resource-friendly.