apple / swift-nio

Event-driven network application framework for high performance protocol servers & clients, non-blocking.
https://swiftpackageindex.com/apple/swift-nio/documentation
Apache License 2.0
7.85k stars 633 forks source link

NIOAsyncChannel doesn't flush all its writes if connection is closed #2757

Open adam-fowler opened 1 week ago

adam-fowler commented 1 week ago

Expected behavior

That all writes via NIOAsyncChannel.output.write are flushed when a connection is closed.

Actual behavior

If I write a large buffer of over 1MB and exit the closure sent to NIOAsyncChannel.executeThenClose immediately the buffer doesn't get flushed.

Here is the related Slack conversation https://swift-open-source.slack.com/archives/C9MMT6VGB/p1713405447642699

Steps to reproduce

  1. Create HTTP1 server using NIOAsyncChannel, which returns 1MB for every request
  2. curl -H "Connection: close" localhost:8888 --output -

SwiftNIO version/commit hash

2.67.0

System & version information

swift-driver version: 1.90.11.1 Apple Swift version 5.10 (swiftlang-5.10.0.13 clang-1500.3.9.4) Target: arm64-apple-macosx14.0

FranzBusch commented 1 week ago

Was outbound half closure enabled on the channel?

Lukasa commented 1 week ago

It should always be enabled.

adam-fowler commented 1 week ago

Was outbound half closure enabled on the channel?

Yes, I've always used half closure

FranzBusch commented 4 days ago

@adam-fowler Could you do me a favour and add a reproducer test for this? I think I know what's going on and the solution to this might be more interesting. My gut feel is that we somewhere buffer the writes since writer.write isn't waiting for the write to make it out we can get the close before all writes have been flushed. This means that some buffered writes might get dropped.

Now solving this might be a little complicated. The easy way would be to just attach a promise to each write but that's going to very costly. @Lukasa and I have discussed a "soft-close" mode a few times which tells the pipeline to flush everything out and then close when possible.

adam-fowler commented 4 days ago

Jesus, making me do half your work 🤣 . Here's a snippet

import NIOPosix
import NIOCore
import NIOHTTP1

/// Sendable server response that doesn't use ``IOData``
public typealias SendableHTTPServerResponsePart = HTTPPart<HTTPResponseHead, ByteBuffer>

/// Channel to convert HTTPServerResponsePart to the Sendable type HBHTTPServerResponsePart
final class HTTPSendableResponseChannelHandler: ChannelOutboundHandler, RemovableChannelHandler {
    typealias OutboundIn = SendableHTTPServerResponsePart
    typealias OutboundOut = HTTPServerResponsePart

    func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise<Void>?) {
        let part = unwrapOutboundIn(data)
        switch part {
        case .head(let head):
            context.write(self.wrapOutboundOut(.head(head)), promise: promise)
        case .body(let buffer):
            context.write(self.wrapOutboundOut(.body(.byteBuffer(buffer))), promise: promise)
        case .end:
            context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: promise)
        }
    }
}

@available(macOS 14, *)
func server() async throws {
    let asyncChannel = try await ServerBootstrap(group: MultiThreadedEventLoopGroup.singleton)
        // Specify backlog and enable SO_REUSEADDR for the server itself
        .serverChannelOption(ChannelOptions.socketOption(.so_reuseaddr), value: 1)
        .childChannelOption(ChannelOptions.allowRemoteHalfClosure, value: true)
        .bind(host: "127.0.0.1", port: 8888, serverBackPressureStrategy: nil) { channel in
            return channel.eventLoop.makeCompletedFuture {
                try channel.pipeline.syncOperations.configureHTTPServerPipeline(
                    withPipeliningAssistance: false, // HTTP is pipelined by NIOAsyncChannel
                    withErrorHandling: true,
                    withOutboundHeaderValidation: false // Swift HTTP Types are already doing this validation
                )
                try channel.pipeline.syncOperations.addHandler(HTTPSendableResponseChannelHandler())
                return try NIOAsyncChannel<HTTPServerRequestPart, SendableHTTPServerResponsePart>(
                    wrappingChannelSynchronously: channel,
                    configuration: .init()
                )
            }
        }
    await withDiscardingTaskGroup { group in
        do {
            try await asyncChannel.executeThenClose { inbound in
                for try await childChannel in inbound {
                    group.addTask {
                        try? await childChannel.executeThenClose { inbound, outbound in
                            for try await part in inbound {
                                if case .end = part {
                                    let buffer = ByteBuffer(repeating: 0, count: 1000000)
                                    try await outbound.write(.head(.init(version: .http1_1, status: .ok, headers: ["content-length": "1000000"])))
                                    try await outbound.write(.body(buffer))
                                    try await outbound.write(.end(nil))
                                    break
                                }
                            }
                        }
                    }
                }
            }
        } catch {
            print("ERROR: Waiting on child channel: \(error)")
        }
    }

}

if #available(macOS 14, *) {
    try await server()
}