MKuranowski / aiocsv

Python: Asynchronous CSV reading/writing
https://pypi.org/project/aiocsv/
MIT License
67 stars 9 forks source link

AsyncDictReader and AsyncReader do not release memory while iterating over the reader #26

Closed jyates-om1 closed 6 months ago

jyates-om1 commented 6 months ago

Summary

When iterating over AsyncDictReader or AsyncReader, we are seeing large amounts of memory being used. After introducing a memory profiler, it looks like aiocsv continues to consume memory in amounts proportional to the size of the input file.

Background

We have been using the aiocsv library for transforming data from large csv files, and it has been awesome to use! Some of our files are huge (7 - 20 GB), so we want to be as memory-efficient as possible when reading them. We've noticed that AsyncDictReader and AsyncReader both consume more memory as time goes on while iterating over the csv using async for row in reader. This memory is then released when the context manager closes down.

I've been poking around in the aiocsv code a bit but haven't had any success figuring out where the memory leak is occurring, other than the reference to /usr/local/lib/python3.11/site-packages/aiocsv/readers.py:43 from the tracemalloc library. I'd be happy to contribute a PR if I find the source.

Sample Code

import asyncio
import tracemalloc
from typing import Dict, List

import aiofiles
from aiocsv import AsyncDictReader

input_file_path = "customers-100000.csv"
csv_row_batch_size = 10

def memory_stats(condition: str):
    snapshot = tracemalloc.take_snapshot()
    top_stats = snapshot.statistics("lineno")

    print(f"[ Memory: {condition} ]")
    for stat in top_stats[:10]:
        print(stat)

async def main():
    rows: List[Dict[str, str]] = []
    chunk_id = 1

    # set up memory tracing
    tracemalloc.start()

    memory_stats("start")

    async with aiofiles.open(
        input_file_path,
        mode="r",
    ) as input_file:
        async for row in AsyncDictReader(input_file):

            rows.append(row)

            # once we have enough rows to make a chunk, write a message and clear the rows
            if len(rows) == csv_row_batch_size:
                print(
                    f"{input_file_path}:{chunk_id}: read csv chunk of {len(rows)} rows",
                )

                # increment chunk id
                chunk_id += 1

                # clear the rows
                rows.clear()

                # output memory stats
                memory_stats(
                    f"chunk / {chunk_id}",
                )

        if len(rows) > 0:
            # handle any remaining rows for final chunk
            print(
                f"{input_file_path}:{chunk_id}: read csv chunk of {len(rows)} rows",
            )

    memory_stats("end")

asyncio.run(main())

Sample memory profiler output

[ Memory: chunk / 263 ]
/usr/local/lib/python3.11/site-packages/aiocsv/readers.py:43: size=2397 KiB, count=36687, average=67 B
<frozen importlib._bootstrap_external>:729: size=36.8 KiB, count=294, average=128 B
/usr/local/lib/python3.11/concurrent/futures/thread.py:58: size=29.3 KiB, count=18, average=1666 B
/usr/local/lib/python3.11/tracemalloc.py:67: size=8960 B, count=140, average=64 B
/usr/local/lib/python3.11/tracemalloc.py:558: size=7024 B, count=146, average=48 B
<frozen importlib._bootstrap>:241: size=4839 B, count=40, average=121 B
/usr/local/lib/python3.11/queue.py:258: size=3394 B, count=12, average=283 B
/usr/local/lib/python3.11/site-packages/aiocsv/readers.py:98: size=2696 B, count=36, average=75 B
/usr/local/lib/python3.11/queue.py:28: size=2643 B, count=11, average=240 B
/usr/local/lib/python3.11/queue.py:223: size=2605 B, count=11, average=237 B

Environment info

versions: python=3.11.4 aiocsv==1.3.1 aiofiles==23.2.1

os: debian 10 (buster), run via docker

MKuranowski commented 6 months ago

Thanks for the detailed report, I'll take a look at it in the following days.

If AsyncReader also leaks memory, it's possible that the leak is in the C part of the library. I'm unsure whether tracemalloc would correctly pickup stack frames from C without debug symbols; possibly memray alongside an aiocsv build with debug symbols would show more accurate locations of the leaked memory sources.

jyates-om1 commented 6 months ago

Got you, I can play around with it some after work hours as well. I haven't touched c since college though, so no promises!

MKuranowski commented 6 months ago

Fixed in a2ca38ccd478e35b64fc6d5d74a484a28f72e993

jyates-om1 commented 6 months ago

Thanks so much for taking a look! I'll keep an eye out for a pypi release, but I'd be happy to validate a RC build if you want

MKuranowski commented 6 months ago

If you really want to validate the changes, build the library from source (python -m build builds a wheel, and pip install /path/to/aiocsv automatically compiles and installs the library in the current environment; provided you have Python headers and a C compiler installed) and run with that. I won't be sharing pre-compiled wheels for non-release commits, as it's generally impossible to cross-compile them locally; wheels are automatically build for releases by cibuildwheel and I don't want to waste runtime minutes on irrelevant builds.

MKuranowski commented 6 months ago

I've released 1.3.2, pre-built wheels should be available in ~15 minutes.

jyates-om1 commented 6 months ago

Thanks for the help! I grabbed 1.3.2 via pypi and the same code now runs without a memory leak.