Open DaaS-20xx opened 3 years ago
Figured out that, according to the details provided here: https://aws.amazon.com/blogs/compute/parallel-processing-in-python-with-aws-lambda/, it's not possible to implement parallelism on Lambda by using the Multiprocessing.Pool and Multiprocessing.Queue (from the multiprocessing module only Pipe can be used). And from the stack trace, the multiprocessing.pool is invoked when the aiofile.AIOFile is fired. Is there an alternative to that?
Hi everyone, I just figured out how to fix the issue. As explained here: https://aws.amazon.com/blogs/compute/parallel-processing-in-python-with-aws-lambda/ AWS Lambda does not support multiprocessing.pool, which is used in the aiofile library (exactly multiprocessing.pool.ThreadPool is invoked in the caio.python_aio.py), and this cause the error I reported above. Now I found the same issues in this other discussion on AWS Lambda not working with snowflake connector: https://github.com/snowflakedb/snowflake-connector-python/issues/287#. In this case the issue has been fixed with the release of new version of the snowflake-connector-python library, specifically by simply replacing multiprocessing.pool.ThreadPool with concurrent.futures.thread.ThreadPoolExecutor. And I applied exactly the same fix manually to the python_aio.py within the caio folder, i.e.:
from concurrent.futures.thread import ThreadPoolExecutor .. self.pool = ThreadPoolExecutor(pool_size)
Then I zipped everything and redeployed in AWS Lambda function, and issue is fixed!
Anyway I assume that this fix should be addressed officially, e.g. some specific add-on for AWS Lambda.
Thanks everybody!
Unfortunately, moving forward, the issue showed up when running the python script in lambda during the opening of the file, and it's on the ThreadPoolExecutor which I had put in place of ThreadPool in the caio.python_aio,py:
[ERROR] AttributeError: 'ThreadPoolExecutor' object has no attribute 'apply_async' Traceback (most recent call last): File "/var/task/lambda_function.py", line 61, in lambda_handler return loop.run_until_complete(basic_transcribe()) File "/var/lang/lib/python3.9/asyncio/base_events.py", line 642, in run_until_complete return future.result() File "/var/task/lambda_function.py", line 51, in basic_transcribe await asyncio.gather(write_chunks(), handler.handle_events()) File "/var/task/lambda_function.py", line 45, in write_chunks async for chunk in reader: File "/var/task/aiofile/utils.py", line 77, in anext chunk = await self.read_chunk() File "/var/task/aiofile/utils.py", line 64, in read_chunk chunk = await self.file.read_bytes( File "/var/task/aiofile/aio.py", line 220, in read_bytes return await self.__context.read(size, self.fileno(), offset) File "/var/task/caio/asyncio_base.py", line 47, in submit assert self.context.submit(op) == 1, "Operation was not submitted" File "/var/task/caio/python_aio.py", line 128, in submit self._execute(operation) File "/var/task/caio/python_aio.py", line 69, in _execute self.pool.apply_async(
So it seems the issue is with opening file in async/non blocking mode in AWS Lambda, that is apparently on this piece of code:
async with aiofile.AIOFile('/tmp/test.wav', 'rb') as afp:
Any suggestion or hint on how can it be addressed? or is it not feasible in AWS Lambda?
Thanks all!
Hi all, everything is working fine locally, but when deployed on AWS Lambda, the simple_file.py application doesn't work. Here the steps I executed:
But when testing, I get an error: "[Errno 38] Function not implemented"; here below (*) the log from CloudWatch.
Any Idea of what is the reason? Am I missing anything or should the simple_file.py script be adapted/modified in some way?
Thanks a lot!! Bye!
(*) [ERROR] OSError: [Errno 38] Function not implemented Traceback (most recent call last): File "/var/lang/lib/python3.9/importlib/init.py", line 127, in import_module return _bootstrap._gcd_import(name[level:], package, level) File "", line 1030, in _gcd_import
File "", line 1007, in _find_and_load
File "", line 986, in _find_and_load_unlocked
File "", line 680, in _load_unlocked
File "", line 850, in exec_module
File "", line 228, in _call_with_frames_removed
File "/var/task/simple_file.py", line 53, in
loop.run_until_complete(basic_transcribe())
File "/var/lang/lib/python3.9/asyncio/base_events.py", line 642, in run_until_complete
return future.result()
File "/var/task/simple_file.py", line 50, in basic_transcribe
await asyncio.gather(write_chunks(), handler.handle_events())
File "/var/task/simple_file.py", line 42, in write_chunks
async with aiofile.AIOFile('s3://transcr12092021/test/test.wav', 'rb') as afp:
File "/var/task/aiofile/aio.py", line 124, in init
self.context = context or get_default_context()
File "/var/task/aiofile/aio.py", line 307, in get_default_context
return create_context()
File "/var/task/aiofile/aio.py", line 294, in create_context
context = caio.AsyncioContext(max_requests, loop=loop)
File "/var/task/caio/asyncio_base.py", line 22, in init
self.context = self._create_context(max_requests, kwargs)
File "/var/task/caio/asyncio_base.py", line 25, in _create_context
return self.CONTEXT_CLASS(max_requests=max_requests, kwargs)
File "/var/task/caio/python_aio.py", line 34, in init__
self.pool = ThreadPool(pool_size)
File "/var/lang/lib/python3.9/multiprocessing/pool.py", line 927, in init
Pool.init(self, processes, initializer, initargs)
File "/var/lang/lib/python3.9/multiprocessing/pool.py", line 196, in init
self._change_notifier = self._ctx.SimpleQueue()
File "/var/lang/lib/python3.9/multiprocessing/context.py", line 113, in SimpleQueue
return SimpleQueue(ctx=self.get_context())
File "/var/lang/lib/python3.9/multiprocessing/queues.py", line 342, in init
self._rlock = ctx.Lock()
File "/var/lang/lib/python3.9/multiprocessing/context.py", line 68, in Lock
return Lock(ctx=self.get_context())
File "/var/lang/lib/python3.9/multiprocessing/synchronize.py", line 162, in init
SemLock.init(self, SEMAPHORE, 1, 1, ctx=ctx)
File "/var/lang/lib/python3.9/multiprocessing/synchronize.py", line 57, in init
sl = self._semlock = _multiprocessing.SemLock(