MKuranowski / aiocsv

Python: Asynchronous CSV reading/writing
https://pypi.org/project/aiocsv/
MIT License
67 stars 9 forks source link

`AsyncWriter` result is inaccurate when multi-coroutine is executed #7

Closed xiaoxinmiao closed 2 years ago

xiaoxinmiao commented 2 years ago

When the parameter of Semaphore is set to 1, the number of records stored in result.csv is correct. When it is set to 10, the number of records is incorrect.

raw.csv

from asyncio import Semaphore, gather, get_event_loop
from typing import List, Set, Tuple

import aiofiles
import numpy as np
from aiocsv import AsyncDictReader, AsyncWriter

async def load_raw_data():
    async with aiofiles.open(f'raw.csv', 'r', encoding="utf-8-sig", newline="") as f:
        raw_list = []
        async for row in AsyncDictReader(f):
            raw_list.append(row)
        return raw_list

async def split_data():
    raw_data = await load_raw_data()

    split_rule = [190, 174, 257, 225, 229, 234, 250, 239, 290, 268, 258, 251, 247, 215, 268, 288, 234, 260, 285, 276, 282, 285, 278, 274, 292, 283, 283, 290, 228,
                  285, 288, 296, 272, 272, 275, 268, 290, 293, 286, 269, 297, 279, 281, 271, 223, 173, 244, 137, 260, 194, 246, 268, 282, 289, 230, 258, 294, 229, 206, 283, 281, 263, 280, 246, 282, 287, 294, 285, 300, 297, 296, 294, 296, 293, 299, 297, 290, 226, 258, 253, 274, 290, 284, 295, 280, 291, 279, 300, 230, 283, 292, 288, 287, 216, 277, 275, 275, 297, 299, 247, 213, 88, 111, 101, 217, 96, 74, 11]
    assert len(raw_data) == 27558

    new_data = []
    start = 0
    for i in split_rule:
        new_data.append(raw_data[start:start+i])
        start += i

    split_len = []
    for i in new_data:
        split_len.append(len(i))
    assert split_rule == split_len
    return new_data

async def get_one(small_list: List):
    try:
        async with aiofiles.open(f'result.csv', 'a', encoding="utf-8-sig", newline="") as f:
            data = [[i['uservip_degree'], i['timestamp'], i['content'],
                     i['opername'], i['upcount']] for i in small_list]
            writer = AsyncWriter(f)
            await writer.writerows(data)

    except Exception as e:
        print(e)

async def trunks(sem: Semaphore, small_list: List):
    async with sem:
        await get_one(small_list)

async def get_group(sem: Semaphore, data: List) -> Tuple[Set[int], np.ndarray]:
    tasks = []
    for i in data:
        tasks.append(trunks(sem, i))
    await gather(*tasks)

async def main():
    new_data = await split_data()
    async with aiofiles.open(f'result.csv', 'w', encoding="utf-8-sig", newline="") as f:
        f.truncate()
        writer = AsyncWriter(f)
        await writer.writerow(['uservip_degree', 'timestamp', 'content', 'opername', 'upcount'])
    # When the parameter of `Semaphore` is set to 1, the number of records stored in `result.csv` is correct.
    # When it is set to 10, the number of records is incorrect.
    sem = Semaphore(10)
    await get_group(sem, new_data)
    pass

if __name__ == '__main__':
    get_event_loop().run_until_complete(main())

https://github.com/relax-space/aiocsv-test

MKuranowski commented 2 years ago

Please dilute the example to be smallest possible that reporduces the issue. I have no time to figure what split_data does.

xiaoxinmiao commented 2 years ago

Sorry, because of my negligence, I didn't do a good job of the data, I tried to simplify the code and reduce the original data. The following is the result of the test: the initial data is 20, after the csv is saved, the data is reduced by 6

raw.csv

from asyncio import Semaphore, gather, get_event_loop
from typing import List, Set, Tuple

import aiofiles
import numpy as np
from aiocsv import AsyncDictReader, AsyncWriter

async def load_raw_data():
    async with aiofiles.open(f'raw.csv', 'r', encoding="utf-8-sig", newline="") as f:
        raw_list = []
        async for row in AsyncDictReader(f):
            raw_list.append(row)
        return raw_list

async def split_data():
    raw_data = await load_raw_data()
    # There are 10 pieces of data in each small list
    n = 10
    new_data = [raw_data[i:i+n] for i in range(0, len(raw_data), n)]
    return new_data

async def get_one(small_list: List):
    try:
        async with aiofiles.open(f'result.csv', 'a', encoding="utf-8-sig", newline="") as f:
            data = [[i['uservip_degree'], i['timestamp'], i['content'],
                     i['opername'], i['upcount']] for i in small_list]
            writer = AsyncWriter(f)
            await writer.writerows(data)

    except Exception as e:
        print(e)

async def trunks(sem: Semaphore, small_list: List):
    async with sem:
        await get_one(small_list)

async def get_group(sem: Semaphore, data: List) -> Tuple[Set[int], np.ndarray]:
    tasks = []
    for i in data:
        tasks.append(trunks(sem, i))
    await gather(*tasks)

async def main():
    new_data = await split_data()
    async with aiofiles.open(f'result.csv', 'w', encoding="utf-8-sig", newline="") as f:
        f.truncate()
        writer = AsyncWriter(f)
        await writer.writerow(['uservip_degree', 'timestamp', 'content', 'opername', 'upcount'])
    # When the parameter of `Semaphore` is set to 1, the number of records stored in `result.csv` is correct.
    # When it is set to 10, the number of records is incorrect.
    sem = Semaphore(10)
    await get_group(sem, new_data)
    pass

if __name__ == '__main__':
    get_event_loop().run_until_complete(main())

Raw data


1. uservip_degree,timestamp,content,opername,upcount
2. 3,225,徐律!啊啊啊啊,,0
3. 0,225,不想当老板的爱豆不是好演员,52HZ,0
4. 0,225,看着真没有文化,刘芮煊,0
5. 5,225,哈哈哈哈哈哈穿的好!!!,,4
6. 3,225,哈哈哈哈他一站起来我爆笑哈哈哈哈,,0
7. 0,225,双北果然开心,ten,0
8. 0,225,不想当老板的爱豆不是好演员,Stars、,0
9. 4,225,站起来,我笑了,,0
10. 0,225,肿裁哈哈哈哈哈,,2
11. 1,225,哈哈哈哈哈哈哈哈哈,,0
12. 3,225,哈哈哈哈哈 以为是件普通西装,兰花.,0
13. 0,225,这个衣服有点🆒,shine!☆☆☆,1
14. 0,225,不愧是我山东人,总是想着当领导,,咕咕咕咕咕嗝…,0
15. 1,225,这一季的嘉宾都太让人喜欢了😘,Snail . r 🗝,3
16. 4,225,坐着的时候范泽言站起来秒变福西西哈哈哈哈哈,温妤妤🍊,1
17. 0,225,看着真没有文化,AM,0
18. 4,225,哈哈哈哈哈 以为是件普通西装,,0
19. 0,225,哈哈哈哈,卤鱼丸最好吃,0
20. 0,225,哈哈哈哈哈 以为是件普通西装,遇见未知的自己,0
21. 1,225,哈哈哈丞丞一开口就老搞笑了,,0

Data stored through csv


1. uservip_degree,timestamp,content,opername,upcount
2. 3,225,徐律!啊啊啊啊,,0
3. 0,225,不想当老板的爱豆不是好演员,52HZ,0
4. 0,225,看着真没有文化,刘芮煊,0
5. 5,225,哈哈哈哈哈哈穿的好!!!,,4
6. 3,225,哈哈哈哈他一站起来我爆笑哈哈哈哈,,0
7. 0,225,双北果然开心,ten,0
8. 0,225,不想当老板的爱豆不是好演员,Stars、,0
9. 4,225,站起来,我笑了,,0
10. 0,225,肿裁哈哈哈哈哈,,2
11. 1,225,哈哈哈哈哈哈哈哈哈,,0
12. 0
13. 0,225,哈哈哈哈,卤鱼丸最好吃,0
14. 0,225,哈哈哈哈哈 以为是件普通西装,遇见未知的自己,0
15. 1,225,哈哈哈丞丞一开口就老搞笑了,,0
MKuranowski commented 2 years ago

Can't reproduce*; I'm always getting 20 rows regardless of the number of allowed coroutines to concurrently append to result.csv.

Also, the code is still not minimal required, has incorrect type hints and is overall too confusing (get_one, but it doesn't retrieve anything?). It doesn't await on f.truncate() in main, uses get_event_loop when there are no event loops present.

I've run distilled your code to the following:

from typing import Any, Iterable
import asyncio
import aiofiles
import aiocsv

Row = Iterable[Any]

async def write_chunk_to_file(sem: asyncio.Semaphore, chunk: list[Row]) -> None:
    async with sem:
        async with aiofiles.open("test.csv", mode="a", newline="") as f:
            await aiocsv.AsyncWriter(f).writerows(chunk)

async def main() -> None:
    # Prepare the data to write in batches
    data_chunks: list[list[Row]] = [
        [("Berlin", "Germany"), ("Madrid", "Spain"), ("Rome", "Italy")],
        [("Washington D.C.", "USA"), ("Ottawa", "Canada"), ("Mexico City", "Mexico")],
        [("Tokyo", "Japan"), ("Beijing", "China"), ("Manila", "Philippines")],
    ]

    # Truncate existing file + write the header
    async with aiofiles.open("test.csv", mode="w", newline="") as f:
        await aiocsv.AsyncWriter(f).writerow(["city", "country"])

    # Launch coroutines to write batches to the file
    sem = asyncio.Semaphore(2)
    await asyncio.gather(*[write_chunk_to_file(sem, chunk) for chunk in data_chunks])

if __name__ == "__main__":
    asyncio.run(main())

* both on your code and my distilled example

xiaoxinmiao commented 2 years ago

with your code still missing data

This is the version of the referenced package,

PS D:\1.source\pythonpath\chanmama> pip list
Package               Version
--------------------- --------------
aiocsv                1.2.1
aiofiles              0.8.0

In addition, my windows information is as follows:

OS name:          Microsoft Windows 10 专业版
OS version:          10.0.19044 暂缺 Build 19044

动画1

xiaoxinmiao commented 2 years ago

The result is correct when running on linux: 1

MKuranowski commented 2 years ago

Ugh, Windows. The race happens even without aiocsv (hence I'm closing the issue):

from typing import Any, Iterable
import asyncio
import aiofiles

Row = Iterable[str]

async def write_chunk_to_file(sem: asyncio.Semaphore, chunk: list[Row]) -> None:
    async with sem:
        async with aiofiles.open("test.csv", mode="a", newline="") as f:
            await f.write(
                "\r\n".join(",".join(row) for row in chunk) + "\r\n"
            )

async def main() -> None:
    # Prepare the data to write in batches
    data_chunks: list[list[Row]] = [
        [("Berlin", "Germany"), ("Madrid", "Spain"), ("Rome", "Italy")],
        [("Washington D.C.", "USA"), ("Ottawa", "Canada"), ("Mexico City", "Mexico")],
        [("Tokyo", "Japan"), ("Beijing", "China"), ("Manila", "Philippines")],
    ]

    # Truncate existing file + write the header
    async with aiofiles.open("test.csv", mode="w", newline="") as f:
        await f.write("city,country\r\n")

    # Launch coroutines to write batches to the file
    sem = asyncio.Semaphore(2)
    await asyncio.gather(*[write_chunk_to_file(sem, chunk) for chunk in data_chunks])

if __name__ == "__main__":
    asyncio.run(main())

I only encounter the issue once in like 10 runs, but, the code allows multiple coroutines to write to the file concurrently - which is by design susceptible to races. And I guess Linux's IO stack handles concurrent appends better than Windows.

xiaoxinmiao commented 2 years ago

Thank you very much for your answer ok, it is exactly what you described, I submitted an issue to aiofiles

https://github.com/Tinche/aiofiles/issues/135