akka / akka-http

The Streaming-first HTTP server/module of Akka
https://doc.akka.io/libraries/akka-http/current/
Other
1.34k stars 594 forks source link

Investigate potential connection leak in HTTP/1.1 when HTTP/2 is enabled #3963

Open jrudolph opened 2 years ago

jrudolph commented 2 years ago

Found by executing ClientServerSpec when "complete a request/response when request has Connection: close set" fails with leaking stages:

Details ``` activeShells (actor: akka://ClientServerSpecBase/system/StreamSupervisor-0/flow-11-0-unnamed) GraphInterpreterShell( logics: [ ServerImpl.netIn attrs: [Name(SubSink%28ServerImpl.netIn%29), InputBuffer(4,16), SupervisionStrategy(), Dispatcher(akka.actor.default-dispatcher)], ServerImpl.netOut attrs: [Name(SubSource%28ServerImpl.netOut%29), InputBuffer(4,16), SupervisionStrategy(), Dispatcher(akka.actor.default-dispatcher)], MapAsyncUnordered(1,akka.http.impl.engine.http2.Http2Ext$$Lambda$51006/764242924@61556c92) attrs: [Name(mapAsyncUnordered), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), InputBuffer(4,16), SupervisionStrategy(), InputBuffer(4,16), SupervisionStrategy(), Dispatcher(akka.actor.default-dispatcher)], One2OneBidi attrs: [Name(One2OneBidi), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), InputBuffer(4,16), SupervisionStrategy(), InputBuffer(4,16), SupervisionStrategy(), Dispatcher(akka.actor.default-dispatcher)], akka.http.impl.engine.server.HttpServerBluePrint$RequestTimeoutSupport@656ae460 attrs: [Name(RequestTimeoutSupport), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), InputBuffer(4,16), SupervisionStrategy(), InputBuffer(4,16), SupervisionStrategy(), Dispatcher(akka.actor.default-dispatcher)], Identity attrs: [Name(identityOp), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), InputBuffer(4,16), SupervisionStrategy(), InputBuffer(4,16), SupervisionStrategy(), Dispatcher(akka.actor.default-dispatcher)], akka.http.impl.engine.server.HttpServerBluePrint$PrepareRequests@1e312320 attrs: [CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), InputBuffer(4,16), SupervisionStrategy(), InputBuffer(4,16), SupervisionStrategy(), Dispatcher(akka.actor.default-dispatcher)], akka.http.impl.engine.server.HttpServerBluePrint$ControllerStage@4bbad175 attrs: [Name(ControllerStage), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), InputBuffer(4,16), SupervisionStrategy(), InputBuffer(4,16), SupervisionStrategy(), Dispatcher(akka.actor.default-dispatcher)], akka.http.impl.engine.rendering.HttpResponseRendererFactory$HttpResponseRenderer$@44493062 attrs: [Name(renderer), Name(renderer), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), InputBuffer(4,16), SupervisionStrategy(), InputBuffer(4,16), SupervisionStrategy(), Dispatcher(akka.actor.default-dispatcher)] Map(akka.http.impl.engine.server.HttpServerBluePrint$$$Lambda$51025/1768981470@24faa018) attrs: [Name(map), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), InputBuffer(4,16), SupervisionStrategy(), InputBuffer(4,16), SupervisionStrategy(), Dispatcher(akka.actor.default-dispatcher)], HttpRequestParser attrs: [Name(HttpRequestParser), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), InputBuffer(4,16), SupervisionStrategy(), InputBuffer(4,16), SupervisionStrategy(), Dispatcher(akka.actor.default-dispatcher)], akka.http.impl.engine.server.HttpServerBluePrint$ProtocolSwitchStage@5f757a79 attrs: [Name(ProtocolSwitchStage), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), InputBuffer(4,16), SupervisionStrategy(), InputBuffer(4,16), SupervisionStrategy(), Dispatcher(akka.actor.default-dispatcher)], Map(SendBytes) attrs: [Name(map), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), InputBuffer(4,16), SupervisionStrategy(), InputBuffer(4,16), SupervisionStrategy(), Dispatcher(akka.actor.default-dispatcher)], Collect attrs: [Name(collect), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), InputBuffer(4,16), SupervisionStrategy(), InputBuffer(4,16), SupervisionStrategy(), Dispatcher(akka.actor.default-dispatcher)], Log attrs: [LogLevels(LogLevel(4),LogLevel(4),LogLevel(4)), LogLevels(LogLevel(4),LogLevel(4),LogLevel(4)), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), InputBuffer(4,16), SupervisionStrategy(), InputBuffer(4,16), SupervisionStrategy(), Dispatcher(akka.actor.default-dispatcher)], Log attrs: [LogLevels(LogLevel(4),LogLevel(4),LogLevel(4)), LogLevels(LogLevel(4),LogLevel(4),LogLevel(4)), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), InputBuffer(4,16), SupervisionStrategy(), InputBuffer(4,16), SupervisionStrategy(), Dispatcher(akka.actor.default-dispatcher)], Identity attrs: [Name(identityOp), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), InputBuffer(4,16), SupervisionStrategy(), InputBuffer(4,16), SupervisionStrategy(), Dispatcher(akka.actor.default-dispatcher)], akka.http.impl.util.StreamUtils$DelayCancellationStage@5ddfe509 attrs: [CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), InputBuffer(4,16), SupervisionStrategy(), InputBuffer(4,16), SupervisionStrategy(), Dispatcher(akka.actor.default-dispatcher)] ], connections: [ Connection(0, ServerImpl.netIn, Identity, Closed) Connection(1, MapAsyncUnordered(1,akka.http.impl.engine.http2.Http2Ext$$Lambda$51006/764242924@61556c92), One2OneBidi, Closed) Connection(2, One2OneBidi, akka.http.impl.engine.server.HttpServerBluePrint$RequestTimeoutSupport@656ae460, Closed) Connection(3, One2OneBidi, MapAsyncUnordered(1,akka.http.impl.engine.http2.Http2Ext$$Lambda$51006/764242924@61556c92), Closed) Connection(4, akka.http.impl.engine.server.HttpServerBluePrint$RequestTimeoutSupport@656ae460, akka.http.impl.engine.server.HttpServerBluePrint$PrepareRequests@1e312320, Closed) Connection(5, akka.http.impl.engine.server.HttpServerBluePrint$RequestTimeoutSupport@656ae460, One2OneBidi, Closed) Connection(6, Identity, akka.http.impl.engine.server.HttpServerBluePrint$RequestTimeoutSupport@656ae460, Closed) Connection(7, akka.http.impl.engine.server.HttpServerBluePrint$PrepareRequests@1e312320, akka.http.impl.engine.server.HttpServerBluePrint$ControllerStage@4bbad175, Closed) Connection(8, akka.http.impl.engine.server.HttpServerBluePrint$ControllerStage@4bbad175, Map(akka.http.impl.engine.server.HttpServerBluePrint$$$Lambda$51025/1768981470@24faa018), Closed) Connection(9, akka.http.impl.engine.server.HttpServerBluePrint$ControllerStage@4bbad175, Identity, Closed) Connection(10, akka.http.impl.engine.rendering.HttpResponseRendererFactory$HttpResponseRenderer$@44493062, akka.http.impl.engine.server.HttpServerBluePrint$ControllerStage@4bbad175, Closed) Connection(11, Map(akka.http.impl.engine.server.HttpServerBluePrint$$$Lambda$51025/1768981470@24faa018), HttpRequestParser, Closed) Connection(12, HttpRequestParser, akka.http.impl.engine.server.HttpServerBluePrint$ProtocolSwitchStage@5f757a79, Closed) Connection(13, akka.http.impl.engine.server.HttpServerBluePrint$ProtocolSwitchStage@5f757a79, akka.http.impl.engine.rendering.HttpResponseRendererFactory$HttpResponseRenderer$@44493062, Closed) Connection(14, akka.http.impl.engine.server.HttpServerBluePrint$ProtocolSwitchStage@5f757a79, Collect, Closed) Connection(15, Map(SendBytes), akka.http.impl.engine.server.HttpServerBluePrint$ProtocolSwitchStage@5f757a79, Closed) Connection(16, Collect, Log, Closed) Connection(17, Log, Map(SendBytes), Closed) Connection(18, Log, akka.http.impl.util.StreamUtils$DelayCancellationStage@5ddfe509, Closed) Connection(19, Identity, Log, Closed) Connection(20, akka.http.impl.util.StreamUtils$DelayCancellationStage@5ddfe509, ServerImpl.netOut, ShouldPush ] ) dot format graph for deadlock analysis: ================================================================ digraph waits { N0 [label="ServerImpl.netIn"]; N1 [label="ServerImpl.netOut"]; N2 [label="MapAsyncUnordered(1,akka.http.impl.engine.http2.Http2Ext$$Lambda$51006/764242924@61556c92)"]; N3 [label="One2OneBidi"]; N4 [label="akka.http.impl.engine.server.HttpServerBluePrint$RequestTimeoutSupport@656ae460"]; N5 [label="Identity"]; N6 [label="akka.http.impl.engine.server.HttpServerBluePrint$PrepareRequests@1e312320"]; N7 [label="akka.http.impl.engine.server.HttpServerBluePrint$ControllerStage@4bbad175"]; N8 [label="akka.http.impl.engine.rendering.HttpResponseRendererFactory$HttpResponseRenderer$@44493062"]; N9 [label="Map(akka.http.impl.engine.server.HttpServerBluePrint$$$Lambda$51025/1768981470@24faa018)"]; N10 [label="HttpRequestParser"]; N11 [label="akka.http.impl.engine.server.HttpServerBluePrint$ProtocolSwitchStage@5f757a79"] N12 [label="Map(SendBytes)"]; N13 [label="Collect"]; N14 [label="Log"]; N15 [label="Log"]; N16 [label="Identity"]; N17 [label="akka.http.impl.util.StreamUtils$DelayCancellationStage@5ddfe509"]; N0 -> N16 [style=dotted, label=closed, dir=both]; N2 -> N3 [style=dotted, label=closed, dir=both]; N3 -> N4 [style=dotted, label=closed, dir=both]; N3 -> N2 [style=dotted, label=closed, dir=both]; N4 -> N6 [style=dotted, label=closed, dir=both]; N4 -> N3 [style=dotted, label=closed, dir=both]; N5 -> N4 [style=dotted, label=closed, dir=both]; N6 -> N7 [style=dotted, label=closed, dir=both]; N7 -> N9 [style=dotted, label=closed, dir=both]; N7 -> N5 [style=dotted, label=closed, dir=both]; N8 -> N7 [style=dotted, label=closed, dir=both]; N9 -> N10 [style=dotted, label=closed, dir=both]; N10 -> N11 [style=dotted, label=closed, dir=both]; N11 -> N8 [style=dotted, label=closed, dir=both]; N11 -> N13 [style=dotted, label=closed, dir=both]; N12 -> N11 [style=dotted, label=closed, dir=both]; N13 -> N15 [style=dotted, label=closed, dir=both]; N14 -> N12 [style=dotted, label=closed, dir=both]; N15 -> N17 [style=dotted, label=closed, dir=both]; N16 -> N14 [style=dotted, label=closed, dir=both]; N17 -> N1 [label=shouldPush, color=red]; } ================================================================ ```
jrudolph commented 2 years ago

This is likely a leak in ProtocolSwitch where not all substreams are closed before the stage is terminated. This seems to be fixed with Akka 2.6.x which automatically closes all substreams that would otherwise be left hanging. Would still be good to fix it properly.

jrudolph commented 2 years ago

Let's not fix this right now as Akka 2.5 itself is not really supported any more. We can keep the ticket open until we reach 10.3 where we will drop Akka 2.5 support completely in which case the issue has solved itself.

In the meantime, if someone actually runs into the problem with 2.5, we can still try to see what needs to be done in ProtocolSwitch to resolve the problem.

jrudolph commented 2 years ago

Investigation branch: https://github.com/akka/akka-http/compare/main...jrudolph:3963-try-to-harden-protocolswitch