mosquito / aiofile

Real asynchronous file operations with asyncio support.
476 stars 29 forks source link

aiofile LineReader does a read for every line in spite of having multiple lines in CHUNK_SIZE #80

Open fredgido opened 1 year ago

fredgido commented 1 year ago

Long story short

LineReader is very slow

Expected behavior

LineReader is as fast as normal line reading. Read Chunk size actually prevents extra reads.

Actual behavior

LineReader is slow, takes many ms per line. LineReader causes a read for each line.

Steps to reproduce

import asyncio
import functools
import time

import aiofile

def print_on_call_decorator(func):
    def wrapper_decorator(*args, **kwargs):
        print("real read called")
        value = func(*args, **kwargs)
        return value

    return wrapper_decorator

aiofile.AIOFile.read_bytes = print_on_call_decorator(aiofile.AIOFile.read_bytes)

async def main():
    async with aiofile.AIOFile("test_line_iter_file", "r") as f:
        last_line_time = time.perf_counter()
        async for line in aiofile.LineReader(f, chunk_size=aiofile.LineReader.CHUNK_SIZE * 16*16):
            # print("line_time", time.perf_counter() - last_line_time)
            last_line_time = time.perf_counter()
            # print(line, end="")

if __name__ == "__main__":
    open("test_line_iter_file", "w").write("\n".join(str(i) for i in range(1000000)))

Additional info

Sync version to compare:

import time

open("test_line_iter_file", "w").write("\n".join(str(i) for i in range(100000)))
start = time.perf_counter()
with open("test_line_iter_file", "r", buffering=4192 * 16) as f:
    last_line_time = time.perf_counter()
    for line in f:
        # print("line_time", time.perf_counter() - last_line_time)
        last_line_time = time.perf_counter()
        # print(line, end="")

print("end_time", time.perf_counter() - start)

My temporary solution that only works for python approved new lines from the file __iter__, its only twice as slow as sync version:

import asyncio
import asyncio
import functools
import io
import itertools
import time
from typing import Union, Self

import aiofile

class CustomLineReader(
    CHUNK_SIZE = 4192

    def __init__(
        aio_file: aiofile.AIOFile,
        offset: int = 0,
        chunk_size: int = CHUNK_SIZE,
        line_sep: str = "\n",
        self.__reader = aiofile.Reader(aio_file, chunk_size=chunk_size, offset=offset)

        self._buffer = None

        self.linesep = aio_file.encode_bytes(line_sep) if aio_file.mode.binary else line_sep

        self.chunk_iterator = None
        self.last_read = None

    async def setup_buffer(self, buffer_initialization=None):
        chunk = await self.__reader.read_chunk()
        if not chunk:
            raise StopAsyncIteration(chunk)

        if self._buffer:
            del self._buffer
        self._buffer = io.BytesIO() if self.__reader.file.mode.binary else io.StringIO()
        if buffer_initialization:


        self.chunk_iterator = self._buffer.__iter__()

    async def __anext__(self) -> Union[bytes, str]:
        if not self._buffer:
            await self.setup_buffer()
            self.last_read = next(self.chunk_iterator)
            if self.last_read[-1] != "\n":
                await self.setup_buffer(self.last_read)
                self.last_read = next(self.chunk_iterator)
        except StopIteration:
            await self.setup_buffer(self.last_read)
            self.last_read = next(self.chunk_iterator)
        return self.last_read

    def __aiter__(self) -> Self:
        return self
mosquito commented 1 year ago

Quick fix:

from aiofile.utils import LineReader

# 1 megabyte chunks
LineReader.CHUNK_SIZE = 2 ** 20

For complete fix the some time or help is wanted.

fredgido commented 1 year ago

Pretty sure this still does a read of the size chunk size for each line anyway so increasing only makes slower