http4s / http4s

A minimal, idiomatic Scala interface for HTTP
https://http4s.org/
Apache License 2.0
2.52k stars 789 forks source link

Random BodyAlreadyConsumedError when request a remote uri when consume body entity twice #7443

Closed counter2015 closed 1 month ago

counter2015 commented 1 month ago

Recently, I discovered a strange phenomenon.

The full code can be accessed from https://github.com/counter2015/http4s-bug

Strict as in not lazy aka streamed from the socket. Data from the socket can only be read once.

For following code, it should raise error since it reads from response body twice

    def io(body: Json, local: Boolean) =
      val endpoint =
        if (!local) uri"https://httpbin.org/post" // random error: Body Has Been Consumed Completely Already
        else uri"http://127.0.0.1:8081/post" // switch to local api, the error will not occur

      val request = Request[IO](Method.POST, endpoint).withEntity(body)
      for {
        result <- client.use { httpClient =>
          httpClient.run(request).use { response =>
            for {
              body <- response.bodyText.compile.string
              _ <- logger.info(s"Response body: $body")
              data <- response.asJsonDecode[Json]
              _ <- logger.info(data.toString)
            } yield data
          }
        }
      } yield result

However, it runs well when request uri is localhost, and for remote host, it fails randomly. I have test it for serveral times, and the result is 14 successes, 6 failures.

The exception message like following

org.http4s.ember.core.Parser$Body$BodyAlreadyConsumedError: Body Has Been Consumed Completely Already
    at org.http4s.ember.core.Parser$Body$BodyAlreadyConsumedError$.apply(Parser.scala:597)
    at org.http4s.ember.core.Parser$Body$.$anonfun$2(Parser.scala:621)
    at fs2.Pull$$anon$1.cont(Pull.scala:149)
    at fs2.Pull$BindBind.cont(Pull.scala:735)
    at fs2.Pull$ContP.apply(Pull.scala:683)
    at fs2.Pull$ContP.apply$(Pull.scala:682)
    at fs2.Pull$Bind.apply(Pull.scala:691)
    at fs2.Pull$Bind.apply(Pull.scala:691)
    at fs2.Pull$.goEval$1$$anonfun$1(Pull.scala:1097)
    at get @ fs2.internal.Scope.openScope(Scope.scala:275)
    at flatMap @ fs2.Compiler$Target.flatMap(Compiler.scala:163)
    at flatMap @ fs2.Pull$.goCloseScope$1$$anonfun$1$$anonfun$3(Pull.scala:1217)
    at update @ org.http4s.ember.server.internal.Shutdown$$anon$1.<init>(Shutdown.scala:78)
    at flatMap @ fs2.Compiler$Target.flatMap(Compiler.scala:163)
    at flatMap @ fs2.Compiler$Target.flatMap(Compiler.scala:163)
    at flatMap @ fs2.Compiler$Target.flatMap(Compiler.scala:163)
    at flatMap @ fs2.Compiler$Target.flatMap(Compiler.scala:163)
    at flatMap @ fs2.Compiler$Target.flatMap(Compiler.scala:163)
    at flatMap @ fs2.Compiler$Target.flatMap(Compiler.scala:163)
    at modify @ org.http4s.ember.server.internal.Shutdown$$anon$1.<init>(Shutdown.scala:90)
    at flatMap @ fs2.Compiler$Target.flatMap(Compiler.scala:163)
    at flatMap @ fs2.Pull$.goCloseScope$1$$anonfun$1(Pull.scala:1218)
    at handleErrorWith @ fs2.Compiler$Target.handleErrorWith(Compiler.scala:161)
    at flatMap @ fs2.Pull$.goCloseScope$1(Pull.scala:1225)
    at get @ fs2.internal.Scope.openScope(Scope.scala:275)

My question is

see more error about BodyAlreadyConsumedError : https://discord.com/channels/632277896739946517/632286375311573032/1179740665916182598 https://discord.com/channels/632277896739946517/632286375311573032/1066044042095382690 https://discord.com/channels/632277896739946517/632286375311573032/933061105067118622

rossabaker commented 1 month ago

Disclosure: I can't follow Discord links.

Compiling an fs2.Stream[IO, Byte] (which is what the body is) twice is not guaranteed to return the same result. An IO value, sequenced twice, is generally free to do different things.

There's a toStrict method that can be called on requests or responses. Unfortunately, it doesn't change the IO type to something that guarantees repeatability, but it should work for your use case. Be sure to read the fine print on memory: you probably want an EntityLimiter or some other body limiting HTTP proxy in front.

counter2015 commented 1 month ago

@rossabaker Thanks your reply! I'm not sure if I understand correctly.

fs2.Stream[IO, Byte] (which is what the body is) twice is not guaranteed to return the same result

That's to say, the result maybe different for same input, but I test it many times, and it always works well when reqeust localhost endpoint, and random fails on remote endpoint. It behaves differernt depends on network environment at least in my views.

Here is some disscusion on discord.

image image image

danicheg commented 1 month ago

the result maybe different for same input, but I test it many times, and it always works well when reqeust localhost endpoint, and random fails on remote endpoint. It behaves differernt depends on network environment at least in my views.

I'm confident enough to say that this behaviour is quite expected. When you're on the localhost, the network is likely to be reliable (since it's a sort of self-contained network interface rather than a full-fledged network).

counter2015 commented 1 month ago

That's the point which makes me confused. Why the network has influence on response body consuming ? It won't supprise me if returns me a timeout exception rather than BodyAlreadyConsumedError. Can we say in local environment, the BodyAlreadyConsumedError won't be raised?

rossabaker commented 1 month ago

Can we say in local environment, the BodyAlreadyConsumedError won't be raised?

Not with certainty. I wouldn't be surprised if this fails if you try it enough times. I wouldn't be surprised if it fails on a different machine. I wouldn't be surprised if it abruptly started failing on the same machine. Once your stream is backed by a TCP socket, the behavior of consuming it twice is undefined. If you want that guarantee, you need to use toStrict, the BodyCache middleware, or some other solution that caches the stream for multiple reads.

counter2015 commented 1 month ago

ok, got it. Thanks!