apple / swift-nio

Event-driven network application framework for high performance protocol servers & clients, non-blocking.
https://swiftpackageindex.com/apple/swift-nio/documentation
Apache License 2.0
8k stars 652 forks source link

NIOFileSystem: BufferedReader isn't really composable #3011

Open weissi opened 2 days ago

weissi commented 2 days ago

Expected behavior

BufferedReader is probably the best abstraction in NIOFileSystem to stream a bunch of data. Real world examples include for example JSON lines (JSONL).

I'd expect to be able to write

try await FileSystem.shared.withFileHandle(forReadingAt: "many-gigabytes.jsonl") { file in
    var reader = file.bufferedReader(capacity: .mebibytes(4))
    for line in try await reader.lines {
        let thing = try JSONDecoder().decode(Thing.self, from: line)
        try await process(thing)
    }
}

And for more complex schemes that aren't just newlines, I'd like to be able to write something composable on BufferedReader.

Actual behavior

Right now I use

try await FileSystem.shared.withFileHandle(forReadingAt: "many-gigabytes.jsonl") { file in
    var reader = file.bufferedReader(capacity: .mebibytes(4))
    while true {
        let (line, seenEOF) = try await reader.read(while: { $0 != UInt8("\n") })
        guard !seenEOF else {
            break
        }
        (_, _) = try await reader.read(while: { $0 == UInt8("\n") }) // skip new lines
        let thing = try JSONDecoder().decode(Thing.self, from: line)
        try await process(thing)
    }
}

But this is

SwiftNIO version/commit hash

2.77.0


FWIW, I also have another thing which allows me to use

try await FileSystem.shared.withFileHandle(forReadingAt: "many-gigabytes.jsonl") { file in
    for try await line in file.lines {
        let thing = try JSONDecoder().decode(Thing.self, from: line)
        try await process(thing)
    }
}

but, it's very complex:

import NIO
import _NIOFileSystem

public struct AsyncByteBufferLineSequence<Base: Sendable>: AsyncSequence & Sendable
where Base: AsyncSequence, Base.Element == ByteBuffer {
    public typealias Element = ByteBuffer
    private let underlying: Base
    private let dropTerminator: Bool
    private let maximumAllowableBufferSize: Int
    private let dropLastChunkIfNoNewline: Bool

    public struct AsyncIterator: AsyncIteratorProtocol {
        public typealias Element = ByteBuffer
        private var underlying: Base.AsyncIterator
        private let dropTerminator: Bool
        private let maximumAllowableBufferSize: Int
        private let dropLastChunkIfNoNewline: Bool
        private var buffer = Buffer()

        struct Buffer {
            private var buffer: [ByteBuffer] = []
            internal private(set) var byteCount: Int = 0

            mutating func append(_ buffer: ByteBuffer) {
                self.buffer.append(buffer)
                self.byteCount += buffer.readableBytes
            }

            func allButLast() -> ArraySlice<ByteBuffer> {
                return self.buffer.dropLast()
            }

            var byteCountButLast: Int {
                return self.byteCount - (self.buffer.last?.readableBytes ?? 0)
            }

            var lastChunkView: ByteBufferView? {
                return self.buffer.last?.readableBytesView
            }

            mutating func concatenateEverything(upToLastChunkLengthToConsume lastLength: Int) -> ByteBuffer {
                var output = ByteBuffer()
                output.reserveCapacity(lastLength + self.byteCountButLast)

                var writtenBytes = 0
                for buffer in self.buffer.dropLast() {
                    writtenBytes += output.writeImmutableBuffer(buffer)
                }
                writtenBytes += output.writeImmutableBuffer(
                    self.buffer[self.buffer.endIndex - 1].readSlice(length: lastLength)!
                )
                if self.buffer.last!.readableBytes > 0 {
                    if self.buffer.count > 1 {
                        self.buffer.swapAt(0, self.buffer.endIndex - 1)
                    }
                    self.buffer.removeLast(self.buffer.count - 1)
                } else {
                    self.buffer = []
                }

                self.byteCount -= writtenBytes
                assert(self.byteCount >= 0)
                return output
            }
        }

        internal init(
            underlying: Base.AsyncIterator,
            dropTerminator: Bool,
            maximumAllowableBufferSize: Int,
            dropLastChunkIfNoNewline: Bool
        ) {
            self.underlying = underlying
            self.dropTerminator = dropTerminator
            self.maximumAllowableBufferSize = maximumAllowableBufferSize
            self.dropLastChunkIfNoNewline = dropLastChunkIfNoNewline
        }

        private mutating func deliverUpTo(
            view: ByteBufferView,
            index: ByteBufferView.Index,
            expectNewline: Bool
        ) -> ByteBuffer {
            let howMany = view.startIndex.distance(to: index) + (expectNewline ? 1 : 0)

            var output = self.buffer.concatenateEverything(upToLastChunkLengthToConsume: howMany)
            if expectNewline {
                assert(output.readableBytesView.last == UInt8(ascii: "\n"))
                assert(
                    output.readableBytesView.firstIndex(of: UInt8(ascii: "\n"))
                        == output.readableBytesView.index(before: output.readableBytesView.endIndex))
            } else {
                assert(output.readableBytesView.last != UInt8(ascii: "\n"))
                assert(!output.readableBytesView.contains(UInt8(ascii: "\n")))
            }
            if self.dropTerminator && expectNewline {
                output.moveWriterIndex(to: output.writerIndex - 1)
            }

            return output
        }

        public mutating func next() async throws -> Element? {
            while true {
                if let view = self.buffer.lastChunkView {
                    if let newlineIndex = view.firstIndex(of: UInt8(ascii: "\n")) {
                        return self.deliverUpTo(
                            view: view,
                            index: newlineIndex,
                            expectNewline: true
                        )
                    }

                    if self.buffer.byteCount > self.maximumAllowableBufferSize {
                        return self.deliverUpTo(
                            view: view,
                            index: view.endIndex,
                            expectNewline: false
                        )
                    }
                }

                if let nextBuffer = try await self.underlying.next() {
                    self.buffer.append(nextBuffer)
                } else {
                    if !self.dropLastChunkIfNoNewline, let view = self.buffer.lastChunkView, !view.isEmpty {
                        return self.deliverUpTo(
                            view: view,
                            index: view.endIndex,
                            expectNewline: false
                        )
                    } else {
                        return nil
                    }
                }
            }
        }
    }

    public init(
        _ underlying: Base, dropTerminator: Bool,
        maximumAllowableBufferSize: Int,
        dropLastChunkIfNoNewline: Bool
    ) {
        self.underlying = underlying
        self.dropTerminator = dropTerminator
        self.maximumAllowableBufferSize = maximumAllowableBufferSize
        self.dropLastChunkIfNoNewline = dropLastChunkIfNoNewline
    }

    public func makeAsyncIterator() -> AsyncIterator {
        return AsyncIterator(
            underlying: self.underlying.makeAsyncIterator(),
            dropTerminator: self.dropTerminator,
            maximumAllowableBufferSize: self.maximumAllowableBufferSize,
            dropLastChunkIfNoNewline: self.dropLastChunkIfNoNewline
        )
    }
}

extension ReadFileHandle {
    public var lines: AsyncByteBufferLineSequence<FileChunks> {
        return AsyncByteBufferLineSequence(
            self.readChunks(),
            dropTerminator: true,
            maximumAllowableBufferSize: 8 * 1024 * 1024,
            dropLastChunkIfNoNewline: false
        )
    }
}
glbrntt commented 1 day ago

What's the concrete ask here Johannes, a file.lines() AsyncSequence? Or a line reader on BufferedReader?

weissi commented 1 day ago

What's the concrete ask here Johannes, a file.lines() AsyncSequence? Or a line reader on BufferedReader?

Both. a lines and general composability of BufferedReader so I can write things like lines more easily.

FranzBusch commented 1 day ago

I understand your ask here. IMO we should start with providing an AsyncSequence way for now since this is the standard API for asynchronous streaming at this point.

I personally feel like a general protocol AsyncReader and protocol AsyncWriter is something very useful that we might want to introduce at the NIO level. Though I personally feel like they are standard library concepts.