typelevel / skunk

A data access library for Scala + Postgres.
https://typelevel.org/skunk/
MIT License
1.58k stars 160 forks source link

Concurrent use of portals and simple queries invalidates portals #553

Open hnaderi opened 2 years ago

hnaderi commented 2 years ago

To reproduce:

//from CursorTest.scala 
  sessionTest("invalidation") { s =>
    cursor(s).use { c =>
      for {
        _ <- c.fetch(1)
        _ <- s.execute(sql"select 1;".query(int4))
        _ <- c.fetch(1)
      } yield ()
    }
  }

This will fail with exception Problem: Portal "portal_2" does not exist.

From what is seen on tcpdump and wireshark, Portal get invalidated implicitly and server takes it as a given and not sends any message for its invalidation.

expected behavior: This is hard to say, as it is exactly what postgres do which is inconvenient IMO, but this is issue can be mitigated by warnings on document, method docs etc.

tpolecat commented 2 years ago

Thanks for this, I'll do some experiments and update the doc as needed.

tpolecat commented 2 years ago

By the way you can add debug = true to your Session configuration and you'll get a transcript of all the messages that go between client and server. No need for Wireshark, etc.

hnaderi commented 2 years ago

related document which specifies this behavior: https://www.postgresql.org/docs/10/protocol-flow.html#PROTOCOL-FLOW-EXT-QUERY

If successfully created, a named portal object lasts till the end of the current transaction, unless explicitly destroyed.

hnaderi commented 2 years ago

No need for Wireshark, etc.

I was trying to capture server communication itself to see if other clients behaviors are different, but it turns out to be fruitless :smile:

benhutchison commented 2 years ago

I suspect Im hitting this or a related problem.

Im trying to do a bunch of bulk inserts.

When I do them serially with traverse it works. When I try in parallel with parTraverseN(8), I get postgres errors like Portal "portal_35" does not exist.

tpolecat commented 2 years ago

I would need to see some code I guess, but it sounds like you need to operate inside a transaction if you want to have multiple active portals.

benhutchison commented 2 years ago

Thanks for the pointer, issue solved :)

I added a session pool and ran each of my parallel insert jobs in a separate session & transaction. Stable and very fast! 🏎️

matthughes commented 1 year ago

Don't the Postgres docs indicate the prepared statement should be good for the life of the connection?

Prepared statements only last for the duration of the current database session. When the session ends, the prepared statement is forgotten, so it must be recreated before being used again. This also means that a single prepared statement cannot be used by multiple simultaneous database clients; however, each client can create their own prepared statement to use. Prepared statements can be manually cleaned up using the DEALLOCATE command.

I don't know. Reading through their docs I don't get a clear session of whether session = tx or session = connection.

matthughes commented 1 year ago

I still get this randomly even inside explicit transactions. I'm not doing anything parallel as far as I can tell. Will try to experiment with debug to see if I can reproduce.

hnaderi commented 1 year ago

I don't know. Reading through their docs I don't get a clear session of whether session = tx or session = connection.

@matthughes From what I remember, the word session in pg docs is used broadly, and in this case, the term refers to a live transaction. If the transaction is terminated (for whatever reason it be), the session is also closed and all of the prepared statements are destroyed, while the connection is still alive and functional.
This seems to be an implicit protocol behavior, as there are no messages transferred between the client and server in this scenario.

I still get this randomly even inside explicit transactions. I'm not doing anything parallel as far as I can tell. Will try to experiment with debug to see if I can reproduce.

It's probably an implicit failure in the transactions. e.g. a merging two streams together.

matthughes commented 1 year ago

Here's an example log showcasing the failure. I have a service that obtains a session, opens a transaction and does a session.prepare call. I hit this service in parallel to exhibit the behavior. I believe this is showing the two transactions communication intermingled.

 → Query(BEGIN)
 ← CommandComplete(Begin)
 ← ReadyForQuery(Active)
 → Query(select txid_current(), pg_backend_pid())
 ← RowDescription(Field(txid_current, 20); Field(pg_backend_pid, 23))
 ← RowDate()
 ← CommandComplete(Select(1))
 ← ReadyForQuery(Active)
 → Bind(portal_1275,statement_29,List(Some(73bc60f8-3cce-4080-8958-d064ca93651e)))
 → Flush
 ← BindComplete
 → Execute(portal_1275,2147483647)
 → Flush
 ← RowDate()
 ← CommandComplete(Select(1))
 → Sync
 ← ReadyForQuery(Active)
 → Close(P,portal_1275)
 → Flush
 ← CloseComplete
 → Query(COMMIT)
 ← CommandComplete(Commit)
 ← ReadyForQuery(Idle)
 → Query(UNLISTEN *)
 ← CommandComplete(Unlisten)
 ← ReadyForQuery(Idle)
 → Query(RESET ALL)
 ← CommandComplete(Reset)
 ← ReadyForQuery(Idle)
 → Bind(portal_1276,statement_31,List(Some(SomeParam)))
 → Flush
 ← BindComplete
 → Execute(portal_1276,2147483647)
 → Flush
 ← RowDate()
 ← CommandComplete(Select(1))
 → Sync
 ← ReadyForQuery(Idle)
 → Close(P,portal_1276)
 → Flush
 ← CloseComplete
 → Bind(portal_1277,statement_21,List(Some(SomeParam)))
 → Flush
 ← BindComplete
 → Query(BEGIN)
 ← CommandComplete(Begin)
 ← ReadyForQuery(Active)
 → Execute(portal_1277,2147483647)
 → Flush
 ← RowDate()
 ← CommandComplete(Select(1))
 → Sync
 ← ReadyForQuery(Active)
 → Query(select txid_current(), pg_backend_pid())
 ← RowDescription(Field(txid_current, 20); Field(pg_backend_pid, 23))
 ← RowDate()
 ← CommandComplete(Select(1))
 ← ReadyForQuery(Active)
 → Close(P,portal_1277)
 → Flush
 ← CloseComplete
 → Bind(portal_1278,statement_29,List(Some(73bc60f8-3cce-4080-8958-d064ca93651e)))
 → Flush
 ← BindComplete
 → Bind(portal_1279,statement_52,List(Some(SomeParam), Some(73bc60f8-3cce-4080-8958-d064ca93651e)))
 → Flush
 ← BindComplete
 → Execute(portal_1278,2147483647)
 → Flush
 ← RowDate()
 ← CommandComplete(Select(1))
 → Sync
 ← ReadyForQuery(Active)
 → Execute(portal_1279,2147483647)
 → Flush
 ← RowDate()
 ← CommandComplete(Select(1))
 → Sync
 ← ReadyForQuery(Active)
 → Close(P,portal_1278)
 → Flush
 ← CloseComplete
 → Close(P,portal_1279)
 → Flush
 ← CloseComplete
 → Query(COMMIT)
 ← CommandComplete(Commit)
 ← ReadyForQuery(Idle)
 → Bind(portal_1280,statement_40,List(Some(SomeParam), Some(73bc60f8-3cce-4080-8958-d064ca93651e)))
 → Flush
 ← BindComplete
 → Query(UNLISTEN *)
 ← CommandComplete(Unlisten)
 ← ReadyForQuery(Idle)
 → Execute(portal_1280,2147483647)
 → Flush
 ← ErrorResponse(F -> postgres.c, M -> portal "portal_1280" does not exist, V -> ERROR, L -> 2074, C -> 34000, R -> exec_execute_message, S -> ERROR
 → Sync
 ← ReadyForQuery(Idle)
 → Query(RESET ALL)
 ← CommandComplete(Reset)
 ← ReadyForQuery(Idle)
 → Close(P,portal_1280)
 → Flush
 ← CloseComplete
 → Bind(portal_1281,statement_31,List(Some(SomeParam)))
 → Flush
 ← BindComplete
 → Execute(portal_1281,2147483647)
matthughes commented 1 year ago

This UNLISTEN right after the BindComplete looks wrong but it's hard to know since this (I believe) is messages from two different connections being interspersed. Is there a relatively easy way to add some sort of connection identifier to the debug log?

Or can we somehow use MessageSocket.history in an error message so we can replay the last N failed logs before a failure?

matthughes commented 1 year ago

Note that turning commandCache = 0 makes this problem go away.

matthughes commented 1 year ago

Ok I finally found what was causing the issue in my code and was able to reproduce in Skunk PR.

https://github.com/typelevel/skunk/pull/904

Seems calling .flatMap(identity) or .flatten "outside" of session/tx resource acquisition causes this problem. I don't know if that's a bug in Skunk or a bug in CE or just a bug in my understanding.

silles79 commented 1 year ago

Hi, we have hit the same problem.

we have something like this

Stream(getStuffFromDb1, getStuffFromDb2 ).parJoinUnbounded
  .through(KafkaProducer.pipe(producerSettings, kafkaProducer))
  .evalTap(_ => commitOffsets(Chunk(record)))
  .flatMap(r => Stream.chunk(r.records.map(_._1.value)))

getStuffFromDb1 and getStuffFromDb2 goes to db and streams some data from here and there: does something like:

for {
        session <- Stream.resource(database.makeSession)
        r <- getStuffFromDb(session)(id)       
      } yield r

when using parJoinUnbounded we had the same portal_X closed. using .parJoin(1) "fixed" the problem, but obviously not ideal

so did using queryCache = 0