davenverse / rediculous

REmote DIctionary Client, that's hysterical.
https://davenverse.github.io/rediculous
MIT License
48 stars 11 forks source link

Early cancellation intermittently causes rediculous connection in an odd state. #53

Open Swoorup opened 2 years ago

Swoorup commented 2 years ago

Using rediculous with timeout can cause weird issues intermittently.

Repro test case.

Add the following to RedisCommandSpec.scala and do a few trial runs.

  test("early termination"){
    redisConnection().flatMap{ connection => 
      val msg1 = "msg1" -> "msg1"
      val msg2 = "msg2" -> "msg2"
      val msg3 = "msg3" -> "msg3"

      val xopts = 
        RedisCommands.XReadOpts.default
          .copy(blockMillisecond = 0L.some, 1L.some)
          // .copy(count = 1L.some, blockMillisecond = 1000L.some)

      val offset: Set[RedisCommands.StreamOffset] = Set(RedisCommands.StreamOffset.From("foo", "$"))

      val extract = (resp: Option[List[RedisCommands.XReadResponse]]) => 
        resp.flatMap(_.headOption).flatMap(_.records.headOption).flatMap(_.keyValues.headOption)

      val action = 
        for {
          _ <- (
            RedisCommands.xadd[RedisIO]("foo", List(msg1)),
            RedisCommands.xadd[RedisIO]("foo", List(msg2)),
            RedisCommands.xadd[RedisIO]("foo", List(msg3))
          ).tupled.run(connection)
          msg1 <- RedisCommands.xread[RedisIO](offset, xopts).run(connection).timeout(50.milli).attempt.map{
            case Right(resp) => extract(resp)
            case Left(_) => None
          }
          empty <- RedisCommands.xread[RedisIO](offset, xopts).run(connection).timeout(50.milli).replicateA(100).attempt
          _ <- RedisCommands.del[RedisIO]("foo").run(connection)
        } yield msg1

      action.assertEquals(None)
    }
  }

Observe the error

==> X io.chrisdavenport.rediculous.RedisCommandsSpec.early termination  0.298s io.chrisdavenport.rediculous.RedisError$Generic: Rediculous: Impossible Return List was Empty but we guarantee output matches input
    at io.chrisdavenport.rediculous.RedisConnection$.$anonfun$head$1(RedisConnection.scala:109)
    at cats.ApplicativeError$LiftFromOptionPartially$.apply$extension(ApplicativeError.scala:329)
    at cats.syntax.OptionOps$LiftToPartiallyApplied.apply(option.scala:374)
    at io.chrisdavenport.rediculous.RedisConnection$.head(RedisConnection.scala:109)
    at io.chrisdavenport.rediculous.RedisConnection$.$anonfun$runRequest$24(RedisConnection.scala:113)
    at clear @ org.http4s.client.middleware.Retry$.retryLoop$1(Retry.scala:77)
    at clear @ org.http4s.client.middleware.Retry$.retryLoop$1(Retry.scala:77)
    at flatMap @ org.typelevel.keypool.KeyPool$.put(KeyPool.scala:250)
    at void @ org.typelevel.keypool.KeyPool$.reap(KeyPool.scala:161)
    at get @ fs2.internal.Scope.openScope(Scope.scala:281)
    at flatMap @ org.typelevel.keypool.KeyPool$.$anonfun$take$7(KeyPool.scala:278)
    at flatMap @ fs2.Compiler$Target.flatMap(Compiler.scala:162)
    at flatMap @ fs2.Compiler$Target.flatMap(Compiler.scala:162)
    at flatMap @ fs2.Compiler$Target.flatMap(Compiler.scala:162)
    at flatMap @ fs2.Compiler$Target.flatMap(Compiler.scala:162)
    at flatMap @ fs2.Compiler$Target.flatMap(Compiler.scala:162)
    at flatMap @ fs2.Compiler$Target.flatMap(Compiler.scala:162)
    at clear @ org.http4s.client.middleware.Retry$.retryLoop$1(Retry.scala:77)
    at flatMap @ fs2.Compiler$Target.flatMap(Compiler.scala:162)

Fix is probably to add .uncancelable statement to the effect returned by RedisConnection::explicitPipelineRequest

ChristopherDavenport commented 2 years ago

Ok, I think I've narrowed it down. I leverage the queued rather than pooled generally which is why I haven't run into tis. But in the pool you have the ability to interrupt the underlying system which can leave connections in a bad state.

def makeSoftCancelable[F[_]: Concurrent, A](fa: F[A], supervisor: Supervisor[F]): F[A] = {
    supervisor.supervise(fa)
    .flatMap(_.joinWith(Concurrent[F].raiseError(new java.util.concurrent.CancellationException("Outcome was Canceled"))))
  }

I should be able to make this soft cancelable with a dispatcher in the pooled and direct versions to prevent these leaking.

Swoorup commented 2 years ago

Great to hear. I do recall having issues with pool, than changing it to queued few months ago, but didn't note what caused the issue, since it was intermittent.

ChristopherDavenport commented 2 years ago

Hmmm, well that fixed that error and introduced a new one. We're in for a wild ride.

==> X io.chrisdavenport.rediculous.RedisCommandsSpec.early termination  30.004s java.util.concurrent.TimeoutException: Future timed out after [30 seconds]
ChristopherDavenport commented 2 years ago

Its binary compatible though and should remain so, so I think we are good to release the next version and fix it afterwards.