matsluni / aws-spi-akka-http

This is an alternative implementation of the new aws java sdk v2 non-blocking async SPI. It provides an alternative to the built-in Netty implementation based on akka-http.
Apache License 2.0
33 stars 15 forks source link

Allow HTTP protocol selection #226

Open jtjeferreira opened 1 year ago

jtjeferreira commented 1 year ago

Allow HTTP protocol selection

fixes #225

matsluni commented 1 year ago

Thanks for the PR @jtjeferreira. I am currently on vacation. I will have a look next week.

jtjeferreira commented 1 year ago

What do you think of adding an integration test, which is using Http2 and also adding a paragraph in the Readme how Http2 can be enabled?

I tried that but only localstack supports http/2...

Do you know which AWS services support Http2?

I could not find any documentation except https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/http2.html, and its mention to kinesis using http2. However if I do curl -v --http2 https://kinesis.us-east-1.amazonaws.com it uses http/1.1. If I do curl -v --http2 https://lambda.us-east-1.amazonaws.com it uses http/2 🤷

jtjeferreira commented 1 year ago

Do you understand what is the issue with the CI?

jtjeferreira commented 1 year ago

Hi @matsluni, I added some tests and improved the README

jtjeferreira commented 1 year ago

@matsluni ready for re-review

matsluni commented 1 year ago

I just found this: https://doc.akka.io/docs/akka-http/current/client-side/http2.html. I am wondering now, if our approach here is not really the thing we expect it to do?

jtjeferreira commented 1 year ago

I just found this: doc.akka.io/docs/akka-http/current/client-side/http2.html.

You mean any particular section?

I am wondering now, if our approach here is not really the thing we expect it to do?

Honestly, I am not sure either what to expect. I think that setting the protocol in the akka.http.scaladsl.model.HttpRequest is having an effect. Otherwise, some of the tests (the S3 and SQS tests), where I use http/2 specifically, would not be failing.

Also since this is an opt-in setting and we don't change anything for existing clients, I think this is a safe PR...

I started exploring the internals of this library because I am having some issues with random connection closures, but I think https://github.com/matsluni/aws-spi-akka-http/pull/228 is much more promising in that regard. So if you are unsure about this PR, we can close it...

matsluni commented 1 year ago

Hi @jtjeferreira, I tried the approach I mentioned above in this branch: https://github.com/matsluni/aws-spi-akka-http/tree/protocol_other_approach. What do you think about this?

The local tests using Http/2 are now do not finish anymore and therefore I commented them out. Actually, I would expect that the local tests using Http/2 are not working, as the local test services are not exposing Http/2 functionality (TLS etc).

jtjeferreira commented 1 year ago

Hi @jtjeferreira, I tried the approach I mentioned above in this branch: protocol_other_approach. What do you think about this?

I don't think we should use the "Connection-Level Client-Side API" (i.e Http().connectionTo) because we will be opening a new connection for every request.

Besides the use of the Source.queue is flawed because it is not failing requests properly and that's why the tests don't finish. We could remove the queue and use:

  def singleRequest(connection: Flow[HttpRequest, HttpResponse, Any], bufferSize: Int = 100): HttpRequest => Future[HttpResponse] = {
    req => {
      Source.single(req)
        .via(connection)
        .runWith(Sink.head)
    }
  }

The local tests using Http/2 are now do not finish anymore and therefore I commented them out.

The local tests are failing because we are not setting the port. We need to do

Http()
  .connectionTo(request.request().host())
  .toPort(request.request().port())
  .http2()

Actually, I would expect that the local tests using Http/2 are not working, as the local test services are not exposing Http/2 functionality (TLS etc).

You can have HTTP/2 without TLS, and akka supports this with http2WithPriorKnowledge() instead of using http2(). If we modify the code to be

    if (protocol == HttpProtocols.`HTTP/2.0`) {
      val baseConnection = Http().connectionTo(request.request().host()).toPort(request.request().port())
      val connection = request.request().protocol() match {
        case "http" => baseConnection.http2WithPriorKnowledge()
        case "https" => baseConnection.http2()
        case _ => throw new IllegalArgumentException("Unsupported protocol")
      }
      val dispatch = singleRequest(connection)
      runner.run(
        () => dispatch(akkaHttpRequest),
        request.responseHandler()
      )
    }

all test are green except:

jtjeferreira commented 1 year ago

After writing the above comment, and looking at the debug logs of tests I realized that my approach of just setting the HttpRequest.protocol to HTTP/2.0 is just changing the HTTP version sent in the HTTP plaintext payload, and no features of HTTP like compression are multiplexing are being used.

my approach

[DEBUG] [10/31/2023 12:45:37.429] [aws-akka-http-akka.actor.default-dispatcher-7] [akka.stream.Log(akka://aws-akka-http/system/Materializers/StreamSupervisor-3)] [client-plain-text ToNet  ] Element: SendBytes  ByteString(272 bytes) first + last 100:
 47 45 54 20 2F 20 48 54 54 50 2F 32 2E 30 0D 0A  | GET / HTTP/2.0..
 48 6F 73 74 3A 20 6C 6F 63 61 6C 68 6F 73 74 3A  | Host: localhost:
 34 39 35 35 35 0D 0A 61 6D 7A 2D 73 64 6B 2D 69  | 49555..amz-sdk-i
 6E 76 6F 63 61 74 69 6F 6E 2D 69 64 3A 20 31 38  | nvocation-id: 18
 34 62 31 64 32 35 2D 37 63 64 33 2D 30 32 61 33  | 4b1d25-7cd3-02a3
 2D 35 37 31 30 2D 65 37 63 36 31 63 34 61 39 34  | -5710-e7c61c4a94
 63 31 0D 0A                                      | c1..
                     ... [72 bytes omitted] ...
 6E 4A 44 4B 5F 36 34 2D 42 69 74 5F 53 65 72 76  | nJDK_64-Bit_Serv
 65 72 5F 56 4D 2F 31 31 2E 30 2E 32 30 2B 30 20  | er_VM/11.0.20+0
 4A 61 76 61 2F 31 31 2E 30 2E 32 30 20 73 63 61  | Java/11.0.20 sca
 6C 61 2F 32 2E 31 33 2E 31 30 20 76 65 6E 64 6F  | la/2.13.10 vendo
 72 2F 48 6F 6D 65 62 72 65 77 20 69 6F 2F 61 73  | r/Homebrew io/as
 79 6E 63 20 68 74 74 70 2F 55 4E 4B 4E 4F 57 4E  | ync http/UNKNOWN
 0D 0A 0D 0A                                      | ....
[DEBUG] [10/31/2023 12:45:37.433] [aws-akka-http-akka.actor.default-dispatcher-7] [akka.stream.Log(akka://aws-akka-http/system/Materializers/StreamSupervisor-3)] [client-plain-text FromNet] Element: SessionBytes  ByteString(208 bytes) first + last 100:
 48 54 54 50 2F 31 2E 31 20 34 32 36 20 55 70 67  | HTTP/1.1 426 Upg
 72 61 64 65 20 52 65 71 75 69 72 65 64 0D 0A 43  | rade Required..C
 6F 6E 74 65 6E 74 2D 54 79 70 65 3A 20 74 65 78  | ontent-Type: tex
 74 2F 68 74 6D 6C 3B 63 68 61 72 73 65 74 3D 69  | t/html;charset=i
 73 6F 2D 38 38 35 39 2D 31 0D 0A 43 6F 6E 74 65  | so-8859-1..Conte
 6E 74 2D 4C 65 6E 67 74 68 3A 20 35 39 0D 0A 43  | nt-Length: 59..C
 6F 6E 6E 65                                      | onne
                     ... [8 bytes omitted] ...
 6C 6F 73 65 0D 0A 53 65 72 76 65 72 3A 20 4A 65  | lose..Server: Je
 74 74 79 28 39 2E 34 2E 35 31 2E 76 32 30 32 33  | tty(9.4.51.v2023
 30 32 31 37 29 0D 0A 0D 0A 3C 68 31 3E 42 61 64  | 0217)....<h1>Bad
 20 4D 65 73 73 61 67 65 20 34 32 36 3C 2F 68 31  |  Message 426</h1
 3E 3C 70 72 65 3E 72 65 61 73 6F 6E 3A 20 55 70  | ><pre>reason: Up
 67 72 61 64 65 20 52 65 71 75 69 72 65 64 3C 2F  | grade Required</
 70 72 65 3E                                      | pre>

your approach

[DEBUG] [10/31/2023 12:48:07.584] [aws-akka-http-akka.actor.default-dispatcher-9] [akka.stream.Log(akka://aws-akka-http/system/Materializers/StreamSupervisor-3)] [client-plain-text ToNet  ] Element: SendBytes  ByteString(24 bytes)
 50 52 49 20 2A 20 48 54 54 50 2F 32 2E 30 0D 0A  | PRI * HTTP/2.0..
 0D 0A 53 4D 0D 0A 0D 0A                          | ..SM....
[DEBUG] [10/31/2023 12:48:07.584] [aws-akka-http-akka.actor.default-dispatcher-9] [Http2ClientDemux(akka://aws-akka-http)] Changing state from WaitingForNetworkToSendControlFrames to Idle
[DEBUG] [10/31/2023 12:48:07.589] [aws-akka-http-akka.actor.default-dispatcher-9] [akka.stream.Log(akka://aws-akka-http/system/Materializers/StreamSupervisor-3)] [client-plain-text ToNet  ] Element: SendBytes  ByteString(21 bytes)
 00 00 0C 04 00 00 00 00 00 00 03 00 00 01 00 00  | ................
 02 00 00 00 00                                   | .....
[DEBUG] [10/31/2023 12:48:07.589] [aws-akka-http-akka.actor.default-dispatcher-9] [Http2ClientDemux(akka://aws-akka-http)] Changing state from Idle to WaitingForData
[DEBUG] [10/31/2023 12:48:07.590] [aws-akka-http-akka.actor.default-dispatcher-9] [akka.stream.Log(akka://aws-akka-http/system/Materializers/StreamSupervisor-3)] [client-plain-text ToNet  ] Element: SendBytes  ByteString(189 bytes)
 00 00 B4 01 05 00 00 00 01 82 86 41 8B A0 E4 1D  | ...........A....
 13 9D 09 B8 D3 EE 00 5F 84 5C 01 30 40 8F 1D 3E  | ......._.\.0@..>
 D6 44 9D 56 35 5D CE 41 A4 C7 A9 63 49 99 0C 82  | .D.V5].A...cI...
 29 0B 1B 8B 3E 57 82 B4 AE B8 F2 C3 0B 84 58 45  | )...>W........XE
 78 81 19 48 37 07 27 40 8A 1D 3E D6 44 9D 56 B0  | x..H7.'@..>.D.V.
 A9 B3 D7 83 03 00 63 7A E4 1F 84 2C 89 3A AD D0  | ......cz...,.:..
 7D C6 C0 97 08 57 69 4D 03 24 5A B7 45 F8 C0 59  | }....WiM.$Z.E..Y
 5C 4B 85 4D 55 96 AC B7 F3 44 E3 4B 5D 32 62 DC  | \K.MU....D.K]2b.
 5B 3B 96 C8 B8 E8 60 21 5C 0B 88 1F EC 0A 65 1F  | [;....`!\.....e.
 71 B0 10 AE 05 C4 05 10 41 D0 36 04 B8 59 5C 20  | q.......A.6..Y\
 53 B9 6A 90 F6 31 8C F4 96 3B 0B E1 43 1D 81 A3  | S.j..1...;..C...
 D5 44 52 74 A6 B6 38 69 CD A7 57 2D 3F           | .DRt..8i..W-?
[DEBUG] [10/31/2023 12:48:07.595] [aws-akka-http-akka.actor.default-dispatcher-9] [akka.stream.Log(akka://aws-akka-http/system/Materializers/StreamSupervisor-3)] [client-plain-text FromNet] Element: SessionBytes  ByteString(208 bytes) first + last 100:
 48 54 54 50 2F 31 2E 31 20 34 32 36 20 55 70 67  | HTTP/1.1 426 Upg
 72 61 64 65 20 52 65 71 75 69 72 65 64 0D 0A 43  | rade Required..C
 6F 6E 74 65 6E 74 2D 54 79 70 65 3A 20 74 65 78  | ontent-Type: tex
 74 2F 68 74 6D 6C 3B 63 68 61 72 73 65 74 3D 69  | t/html;charset=i
 73 6F 2D 38 38 35 39 2D 31 0D 0A 43 6F 6E 74 65  | so-8859-1..Conte
 6E 74 2D 4C 65 6E 67 74 68 3A 20 35 39 0D 0A 43  | nt-Length: 59..C
 6F 6E 6E 65                                      | onne
                     ... [8 bytes omitted] ...
 6C 6F 73 65 0D 0A 53 65 72 76 65 72 3A 20 4A 65  | lose..Server: Je
 74 74 79 28 39 2E 34 2E 35 31 2E 76 32 30 32 33  | tty(9.4.51.v2023
 30 32 31 37 29 0D 0A 0D 0A 3C 68 31 3E 42 61 64  | 0217)....<h1>Bad
 20 4D 65 73 73 61 67 65 20 34 32 36 3C 2F 68 31  |  Message 426</h1
 3E 3C 70 72 65 3E 72 65 61 73 6F 6E 3A 20 55 70  | ><pre>reason: Up
 67 72 61 64 65 20 52 65 71 75 69 72 65 64 3C 2F  | grade Required</
 70 72 65 3E                                      | pre>

So I agree that we need to use the "Connection-Level Client-Side API", but we need to find a way so that the connection is not recreated every time...

jtjeferreira commented 1 year ago

Honestly, I think we can close this, and revisit this when akka-http provides http/2 support in the "Request-Level Client-Side API"

pjfanning commented 9 months ago

@jtjeferreira @matsluni I logged https://github.com/apache/incubator-pekko-http/issues/483 - is this the missing feature that you need?

jtjeferreira commented 9 months ago

@jtjeferreira @matsluni I logged apache/incubator-pekko-http#483 - is this the missing feature that you need?

My last comment referred to another limitation:

Honestly, I think we can close this, and revisit this when akka-http provides http/2 support in the "Request-Level Client-Side API"

i.e currently http/2 is only supported at the Connection-Level API:

val connectionFlow: Flow[HttpRequest, HttpResponse, Future[Http.OutgoingConnection]] =
      Http().connectionTo("localhost").http2()

but it should support the Request-Level API:

val httpResponseFuture: Future[HttpResponse] = Http().singleRequest(HttpRequest(uri = "http://localhost"))
//hipotethical API 
val http2ResponseFuture: Future[HttpResponse] = Http2().singleRequest(HttpRequest(uri = "http://localhost"))