MKuranowski / aiocsv

Python: Asynchronous CSV reading/writing
https://pypi.org/project/aiocsv/
MIT License
64 stars 8 forks source link

Support: Reading from AsyncIterator #27

Open gregbrowndev opened 1 month ago

gregbrowndev commented 1 month ago

Hi, thanks for the work on this library.

I saw in the docs thataiocsv only supports reading from file-like objects that support the WithAsyncRead protocol. However, it would be nice to have the ability to read from an AsyncIterator, to have feature parity with the standard csv lib which can read from an iterable.

My specific usecase is reading a CSV from S3 using the aioboto3 library to stream the file using iter_lines or iter_chunks.

Example:

import asyncio
import csv

import aioboto3

async def main():
    session = aioboto3.Session()
    async with session.client("s3") as s3:

        response = await s3.get_object(Bucket="my-bucket", Key="data.csv")
        lines_iter = (
            line.decode('utf-8') 
            async for line in response["Body"].iter_lines()
        )

        async for row in AsyncDictReader(lines_iter, delimiter=","):
            print(row)

if __name__ == '__main__':
  asyncio.run(main())

I imagine you could write something to adapt the async iter into the async read API, but I didn't know if you could do this efficiently without buffering the response.

I would be grateful to see this feature considered in the library.

Cheers!

MKuranowski commented 1 month ago

Isn't response["Body"] already a file-like object?

Regardless, yes, there should be support for AsyncIterable[str], but usually there's a file-like object available anyway, so I haven't really prioritized this API mismatch.

gregbrowndev commented 1 month ago

Thanks a lot, you're right, I missed that by focusing on the iter_lines method. This code now works for anyone else trying to do the same thing. I used your answer here to handle the decoding: https://github.com/MKuranowski/aiocsv/issues/2

import asyncio
import codecs

import aioboto3
from aiocsv import AsyncDictReader
from aiocsv.protocols import WithAsyncRead

class AsyncTextReaderWrapper(WithAsyncRead):
    def __init__(self, obj, encoding: str, errors="strict"):
        self.obj = obj
        decoder_factory = codecs.getincrementaldecoder(encoding)
        self.decoder = decoder_factory(errors)

    async def read(self, size: int) -> str:
        raw_data = await self.obj.read(size)

        if not raw_data:
            return self.decoder.decode(b"", final=True)

        return self.decoder.decode(raw_data, final=False)

async def main():
    session = aioboto3.Session()
    async with session.client("s3") as s3:

        response = await s3.get_object(Bucket="my-bucket", Key="data.csv")

        f = AsyncTextReaderWrapper(response["Body"], encoding="utf-8")

        async for row in AsyncDictReader(f, delimiter=","):
            print(row) 

if __name__ == '__main__':
  asyncio.run(main())