Open scheung38 opened 4 years ago
with asyncio.gather
?
thanks @Natim asyncio.gather would independently process, say 100 independent csv files in parallel or non-blocking matter? for example 100,000 csv files to hashlib took 30 mins, so interesting to see this approach would take?
You might want to watch my talk about IO Bound and CPU Bound mixes: https://www.youtube.com/watch?v=eJBbM3RpEUI
To elaborate a little bit.
If you don't mind having a single process computational CSV hasher.
You can use aiofile to read your files line by lines and then it will be made in parallel.
import asyncio
import hashlib
from aiofile import AIOFile, LineReader
async def hashlib_file(filename):
# Open file
async with AIOFile(filename, 'rb') as afd:
# Create hasher
hasher = hashlib.sha256()
async for line in LineReader(afd):
# For each line update hasher
hasher.update(line)
# return hexdigest
return (hasher.hexdigest(), filename)
async def main():
FILES = (
"worker.py",
"README.md",
)
actions = [hashlib_file(f) for f in FILES]
results = await asyncio.gather(*actions)
for filehash, filename in results:
print(f"{filehash}\t{filename}")
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
$ sha256sum *
f3c8145d5c50b1b1536fe15561ad1fb6129f5c6e06f97054bdfb8f374ed4682f worker.py
b4b392521258362b79cf1ed7b42ade0308a41a1275a6da21ede4ed03089bfae8 README.md
$ python worker.py
f3c8145d5c50b1b1536fe15561ad1fb6129f5c6e06f97054bdfb8f374ed4682f worker.py
b4b392521258362b79cf1ed7b42ade0308a41a1275a6da21ede4ed03089bfae8 README.md
If you want something really fast, you should use asyncio.create_subprocess_exec
and a unix command such as sha256sum
import asyncio
async def hashlib_file(filename):
proc = await asyncio.create_subprocess_exec(
"sha256sum", filename,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await proc.communicate()
value, _ = stdout.decode().split()
return value, filename
async def main():
FILES = (
"worker.py",
"README.md",
)
actions = [hashlib_file(f) for f in FILES]
results = await asyncio.gather(*actions)
for filehash, filename in results:
print(f"{filehash}\t{filename}")
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
$ python worker_subprocess.py
f3c8145d5c50b1b1536fe15561ad1fb6129f5c6e06f97054bdfb8f374ed4682f worker.py
b4b392521258362b79cf1ed7b42ade0308a41a1275a6da21ede4ed03089bfae8 README.md
@scheung38 I added the code in my previous message.
performance of single vs multi processing option?
I am going to let you try on your huge CSV files and tell us, it might be interesting.
For small files you won't see the difference. For huge ones, I am interested.
not dealing with a huge csv file but maybe 100,000 small csv files?
Appreciate your feedback though...
Using standard non-async:
import hashlib
import time
BLOCK_SIZE = 65536
def hash_csv():
digests = []
for i in range(1000): # to simulate a 1000 files
for filename in ['Y':\\sample.csv']: # around 75k in size 38 columns
hasher = hashlib.blake2s()
with open(filename, 'rb') as f:
buf = f.read(BLOCK_SIZE)
hasher.update(buf)
a = hasher.hexdigest()
digests.append(a)
print(a)
return digests
if __name__ == '__main__':
start = time.time()
hash_csv()
end = time.time()
total = end - start
print(total)
12.5 sec
Tried your single processing version:
import asyncio
import hashlib
from aiofile import AIOFile, LineReader
async def hashlib_file(filename):
# Open file
async with AIOFile(filename, 'rb') as afd:
# Create hasher
hasher = hashlib.blake2s()
async for line in LineReader(afd):
# For each line update hasher
hasher.update(line)
# return hexdigest
return (hasher.hexdigest(), filename)
async def main():
FILES = (
"Y: \\sample.csv",
)
for i in range(1000):
actions = [hashlib_file(f) for f in FILES]
results = await asyncio.gather(*actions)
for filehash, filename in results:
print(f"{filehash}\t{filename}")
start = time.time()
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
end = time.time()
total = end - start
print(total)
71 sec
Tried your multiprocessing version:
import asyncio
async def hashlib_file(filename):
proc = await asyncio.create_subprocess_exec(
"sha256sum", filename,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await proc.communicate()
value, _ = stdout.decode().split()
return value, filename
async def main():
FILES = (
"Y: sample.csv",
)
actions = [hashlib_file(f) for f in FILES]
results = await asyncio.gather(*actions)
for filehash, filename in results:
print(f"{filehash}\t{filename}")
start = time.time()
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
end = time.time()
total = end - start
print(total)
Error: NotImplementedError?
So not sure why it is slower than sync version?
It seems you are using Windows, so you cannot exec a unix process from there.
Your code is wrong for the async test :joy:
FILES = (
"Y:\\sample.csv",
)
actions = [hashlib_file(FILES[0]) for _ in range(1000)]
results = await asyncio.gather(*actions)
for filehash, filename in results:
print(f"{filehash}\t{filename}")
single or multiple is wrong?
thanks