akka / akka-http

The Streaming-first HTTP server/module of Akka
https://doc.akka.io/libraries/akka-http/current/
Other
1.34k stars 594 forks source link

Expect: 100-Continue is not functioning when a unmarshaller inspecting content-type is present #502

Closed devzer01 closed 8 years ago

devzer01 commented 8 years ago

I have used the following code (at bottom) to boot up a akka http server and i am unable to verify that the support for header Expect: 100-Continue is working as expected.

I am using akka-http 2.4.7

scalaVersion := "2.11.0"
libraryDependencies += "com.typesafe.akka" %% "akka-http-core" % "2.4.7"
libraryDependencies += "com.typesafe.akka" %% "akka-http-experimental" % "2.4.7"

once booting up the server i have executed the following sequence

Nayanas-MacBook-Pro:main nayana$ telnet localhost 8080
Trying ::1...
telnet: connect to address ::1: Connection refused
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
POST /hello HTTP/1.1
Host: localhost
Content-Length: 200
Expect: 100-Continue

HTTP/1.1 200 OK
Server: akka-http/2.4.7
Date: Wed, 09 Nov 2016 15:58:10 GMT
Connection: close
Content-Type: text/plain; charset=UTF-8
Content-Length: 5

as you can see akka is not sending the correct response. making the same request to apache running on my localhost i see it as follows

Nayanas-MacBook-Pro:main nayana$ telnet localhost 80
Trying ::1...
Connected to localhost.
Escape character is '^]'.
POST /hello HTTP/1.1
Host: localhost
Content-Length: 200
Expect: 100-Continue

HTTP/1.1 100 Continue
package org.zero

/**
  * Created by nayana on 11/9/16.
  */
import akka.actor.ActorSystem
import akka.stream.scaladsl._
import akka.util.ByteString
import akka.http.scaladsl.Http
import akka.http.scaladsl.marshalling.ToResponseMarshaller
import akka.http.scaladsl.model.{ContentTypes, HttpEntity}
import akka.http.scaladsl.server.Directives._
import akka.stream.ActorMaterializer
import com.sun.xml.internal.ws.util.Pool.Unmarshaller

import scala.concurrent.Future
import scala.util.Random
import scala.io.StdIn

object WebServer {

  def main(args: Array[String]) {

    implicit val system = ActorSystem()
    implicit val materializer = ActorMaterializer()
    // needed for the future flatMap/onComplete in the end
    implicit val executionContext = system.dispatcher

    // streams are re-usable so we can define it here
    // and use it for every request
    val numbers = Source.fromIterator(() =>
      Iterator.continually(Random.nextInt()))

    val route =
      path("hello") {
        post {
          complete(Future.successful(Option("Hello")))
        }
      }

    val bindingFuture = Http().bindAndHandle(route, "localhost", 8080)
    println(s"Server online at http://localhost:8080/\nPress RETURN to stop...")
    StdIn.readLine() // let it run until user presses return
    bindingFuture
      .flatMap(_.unbind()) // trigger unbinding from the port
      .onComplete(_ => system.terminate()) // and shutdown when done
  }
}

i am seeing this on the server console when this happens

WARN] [11/09/2016 23:04:25.752] [default-akka.actor.default-dispatcher-4] [akka.actor.ActorSystemImpl(default)] Sending 2xx response before end of request was received...
Note that the connection will be closed after this response. Also, many clients will not read early responses!
Consider waiting for the request end before dispatching this response!
devzer01 commented 8 years ago

i looked at the code generating that log it's as follows

if (isEarlyResponse && response.status.isSuccess)
            log.warning(
              "Sending an 2xx 'early' response before end of request was received... " +
                "Note that the connection will be closed after this response. Also, many clients will not read early responses! " +
                "Consider only issuing this response after the request data has been completely read!")
          val close = requestStart.closeRequested ||
            (requestStart.expect100Continue && oneHundredContinueResponsePending) ||
            (isClosed(requestParsingIn) && openRequests.isEmpty) ||
            isEarlyResponse

          emit(responseCtxOut, ResponseRenderingContext(response, requestStart.method, requestStart.protocol, close),
            pullHttpResponseIn)

i don't see where it suppose to send a Continue response in that code

johanandren commented 8 years ago

The problem here is that you do not actually consume the request entity in any way, you just match on the method POST and then complete the request with OK, which is exactly what you see in the client. Try consuming the request entity in any way and you will see that it works as you expect.

johanandren commented 8 years ago

To clarify, that would be something like:

val route =
  post {
    entity(as[String]) { bodyAsString => 
      complete("got it")
    }
  }
devzer01 commented 8 years ago

sure, let me try that first, i do have end-points in a larger project that expect paylods in the route definition. i wanted to create a smaller example to test the issue. let me do as you said and revert back. thanks @johanandren

devzer01 commented 8 years ago

I can still reproduce this issue in the original project, the root-cause maybe an unmarshaller

i have route defined as follows

((post | put) & entity(as[FooBar])) { fooBar =>
          complete { saveFoo(fooBar) }
      }

there is a unmarshaller that is defined as follows


val protobufContentType = ContentType(MediaType.applicationBinary("octet-stream", Compressible, "proto"))
val applicationJsonContentType = ContentTypes.`application/json`

implicit val um:FromEntityUnmarshaller[FooBar] = {
    Unmarshaller.withMaterializer[HttpEntity, FooBar](_ => implicit mat => {
      case entity@HttpEntity.Strict(`applicationJsonContentType`, data) =>
        val charBuffer = Unmarshaller.bestUnmarshallingCharsetFor(entity)
        FastFuture.successful(JsonFormat.fromJsonString(data.decodeString(charBuffer.nioCharset().name()))(FooBar))
      case entity@HttpEntity.Strict(`protobufContentType`, data) =>
        FastFuture.successful(PropertyEntity.parseFrom(CodedInputStream.newInstance(data.asByteBuffer)))
      case entity =>
        Future.failed(UnsupportedContentTypeException(applicationJsonContentType, protobufContentType))
    })
  }

i am seeing the following response

Escape character is '^]'.
POST /v1/property/details/ HTTP/1.1
Host: localhost
Content-Length: 10
Expect: 100-continue
Accept: application/octet-stream
Content-Type: application/octet-stream

HTTP/1.1 415 Unsupported Media Type
Server: akka-http/2.4.11
Date: Wed, 09 Nov 2016 18:07:18 GMT
Connection: close
Content-Type: text/plain; charset=UTF-8
Content-Length: 99

The request's Content-Type is not supported. Expected:
application/json or application/octet-stream

@johanandren

devzer01 commented 8 years ago

here is a full re-producable code

package org.zero

import akka.actor.ActorSystem
import akka.stream.scaladsl._
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.MediaType.Compressible
import akka.http.scaladsl.model.{ContentType, ContentTypes, HttpEntity, MediaType}
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.unmarshalling.{FromEntityUnmarshaller, Unmarshaller}
import akka.http.scaladsl.unmarshalling.Unmarshaller.UnsupportedContentTypeException
import akka.http.scaladsl.util.FastFuture
import akka.stream.ActorMaterializer

import scala.concurrent.Future
import scala.util.Random
import scala.io.StdIn

object WebServer {

  def main(args: Array[String]) {

    val protobufContentType = ContentType(MediaType.applicationBinary("octet-stream", Compressible, "proto"))
    val applicationJsonContentType = ContentTypes.`application/json`

    implicit val system = ActorSystem()
    implicit val materializer = ActorMaterializer()
    implicit val executionContext = system.dispatcher

    implicit val um1:FromEntityUnmarshaller[ProtoBufFoo] = {
      Unmarshaller.withMaterializer[HttpEntity, ProtoBufFoo](_ => implicit mat => {
        case entity@HttpEntity.Strict(`applicationJsonContentType`, data) =>
          FastFuture.successful(ProtoBufFoo.parseFrom(data.asByteBuffer))
        case entity@HttpEntity.Strict(`protobufContentType`, data) =>
          FastFuture.successful(ProtoBufFoo.parseFrom(data.asByteBuffer))
        case entity =>
          Future.failed(UnsupportedContentTypeException(applicationJsonContentType, protobufContentType))
      })
    }

    val numbers = Source.fromIterator(() =>
      Iterator.continually(Random.nextInt()))

    val route =
      path("hello") {
        post {
          entity(as[ProtoBufFoo]) { bodyAsString =>
            complete("got it")
          }
        }
      }

    val bindingFuture = Http().bindAndHandle(route, "localhost", 8080)
    println(s"Server online at http://localhost:8080/\nPress RETURN to stop...")
    StdIn.readLine() // let it run until user presses return
    bindingFuture
      .flatMap(_.unbind()) // trigger unbinding from the port
      .onComplete(_ => system.terminate()) // and shutdown when done
  }
}

import java.nio.ByteBuffer

case class ProtoBufFoo(id: Int, name: String)

object ProtoBufFoo {
  def parseFrom(bf: ByteBuffer): ProtoBufFoo = {
    ProtoBufFoo(1, "foo")
  }
}
devzer01 commented 8 years ago

@johanandren (ping) i am going to bed soon, its almost 2 AM. I will check in the AM. if there is a better way to write the unmarshaller to avoid this issue i am happy to take that route, if i understood more of the underlying akka i will be most happy to submit a PR as a fix for this issue.

johanandren commented 8 years ago

From RFC-2616:

  • Upon receiving a request which includes an Expect request-header field with the "100-continue" expectation, an origin server MUST either respond with 100 (Continue) status and continue to read from the input stream, or respond with a final status code. The origin server MUST NOT wait for the request body before sending the 100 (Continue) response. If it responds with a final status code, it MAY close the transport connection or it MAY continue

The server does not accept the content type of the request and replies early with the error code, as far as I see that is correct and according to the spec?

devzer01 commented 8 years ago

The server decide this because of the unmarshaller logic requires payload to be present to validate it's condition, for Expect: 100-continue i don't believe the unmarshaller logic should kick-in until the payload is present.

Why would an unmarshaller is kicked-in when the server already is aware of that expect100continue = true

johanandren commented 8 years ago

It doesn't have to if all you do is look at the header. Additionally, if you do 100-continue I don't think the entity will ever be strict, as you are actually streaming content, so that could be a bug in your unmarshaller causing 100-continue to always fail with HTTP 415

devzer01 commented 8 years ago

so what you are saying is that it's the responsibility of unmarshaller to handle the Expect header and progress accordingly? But i still don't understand the reason for unmarshaller to be called at this time. the only reasoning i can think of that, given it's async nature akka has decided to start the request logic even when payload has not arrived yet, looking at the callstack from the exception point its very clear that request entered the execution path of all the directives when the request is not fully completed.

So what is the akka defined way to handle this requirement? Can a unmarshaller return CallBack when have data type of message?

devzer01 commented 8 years ago

using something like this

/**
   * Collects all possible parts and returns a potentially future Strict entity for easier processing.
   * The Future is failed with an TimeoutException if the stream isn't completed after the given timeout.
   */
  def toStrict(timeout: FiniteDuration)(implicit fm: Materializer): Future[HttpEntity.Strict] =
    dataBytes
      .via(new akka.http.impl.util.ToStrict(timeout, contentType))
      .runWith(Sink.head)
``` ?
johanandren commented 8 years ago

If you want to provide a HttpEntity-unmarshaller and you do not want to return 415 for non-strict requests, you need to have it also handle non-strict (streamed) content, look at the sources of the akka.http.scaladsl.unmarshalling.PredefinedFromEntityUnmarshallers.byteStringUnmarshaller for an example of this.

devzer01 commented 8 years ago

ok thanks

devzer01 commented 8 years ago

i was able to handle the best case by adding the following code,

case entity => entity.dataBytes.runFold(ByteString.empty)(_ ++ _).collect{
          case b => ProtoBufFoo.parseFrom(b.asByteBuffer)
        }

but its needs further expansion so that it can match against the request content type and the payload length etc, and also throw exception when content is not matching. thanks again for your help

devzer01 commented 8 years ago

@johanandren i solved my problem using the following unmarshaller code

Unmarshaller.withMaterializer[HttpEntity, T](_ => implicit mat => {
      case entity@HttpEntity.Strict(`applicationJsonContentType`, data) =>
        val charBuffer = Unmarshaller.bestUnmarshallingCharsetFor(entity)
        FastFuture.successful(JsonFormat.fromJsonString(data.decodeString(charBuffer.nioCharset().name()))(companion))
      case entity@HttpEntity.Strict(`protobufContentType`, data) =>
        FastFuture.successful(companion.parseFrom(CodedInputStream.newInstance(data.asByteBuffer)))
      case entity => entity.contentType match {
        case `applicationJsonContentType` =>
          entity.dataBytes.runFold(ByteString.empty)(_ ++ _).collect{
            case b =>
              val charBuffer = Unmarshaller.bestUnmarshallingCharsetFor(entity)
              JsonFormat.fromJsonString(b.decodeString(charBuffer.nioCharset().name()))(companion)
          }
        case `protobufContentType` =>
          entity.dataBytes.runFold(ByteString.empty)(_ ++ _).collect{
            case b => companion.parseFrom(CodedInputStream.newInstance(b.asByteBuffer))
          }
        case _ => Future.failed(UnsupportedContentTypeException(applicationJsonContentType, protobufContentType))
      }
    })

given that this part of the code is still tagged under experimental, is there any hope for delaying the unmarshaller ?

johanandren commented 8 years ago

Not sure what you mean with delaying the unmarshaller?

johanandren commented 8 years ago

I know that the API and what it is capable of is a bit hard to grasp at first, but you should be able to compose existing behaviours with your custom logic to a much larger amount than you have in the sample.

It could look something like this:

  import Unmarshaller._

  def protobufUnmarshaller[T] = Unmarshaller.byteStringUnmarshaller
    .forContentTypes(yourProtobufType)
    .map(bytestring => parseBytesWithProtobufTo[T](bytestring))

  def jsonUnmarshaller[T] = ??? // best would likely be to use existing json marshallers 

  def unmarshaller[T]: FromEntityUnmarshaller[T] =
    Unmarshaller.firstOf[HttpEntity, T](
      protobufUnmarshaller[T],
      jsonUnmarshaller[T]
    )

I'm going to close this ticket as it is more of a question than a bug or an improvement request, please use the akka user mailing list https://groups.google.com/forum/#!forum/akka-user or the user gitter channel for questions: https://gitter.im/akka/akka

Thanks

devzer01 commented 8 years ago

@johanandren its very possible that my references / opinions are not align with the reactive approach and akka ways of doing things, i see that this functionality of unmarshaller is triggered from akka streams etc.

My question was simply that for http1 request is akka is able to delay the flow of unmarshaller logic until the full request is complete? I think i understand the reason for why it's getting triggered the way it is so that streaming content can be handled easily.

ok no problem :)

devzer01 commented 8 years ago

no problem, thanks for bearing with me :) as i posted few hours ago, i already have the unmarshaller working as i wanted, but yes i agree its messy and can probably be simplified. :) thanks again.