opensearch-project / opensearch-py

Python Client for OpenSearch
https://opensearch.org/docs/latest/clients/python/
Apache License 2.0
357 stars 176 forks source link

[BUG] - parallel_bulk does not work in AWS lambda #94

Open Aarif1430 opened 2 years ago

Aarif1430 commented 2 years ago
OSError: [Errno 38] Function not implemented. I started seeing this error after upgrading to python3.9. The reason is opensearch `bulk` function is using multiprocessing module internally and ```python multiprocessing.pool.ThreadPool``` is breaking. ```python OSError: [Errno 38] Function not implemented sl = self._semlock = _multiprocessing.SemLock( SemLock.__init__(self, SEMAPHORE, 1, 1, ctx=ctx) File "/var/lang/lib/python3.9/multiprocessing/synchronize.py", line 57, in __init__ SemLock.__init__(self, SEMAPHORE, 1, 1, ctx=ctx) File "/var/lang/lib/python3.9/multiprocessing/synchronize.py", line 162, in __init__ return Lock(ctx=self.get_context()) ``` --
image

It looks like:

To Reproduce

Steps to reproduce the behavior:

  1. Deploy an application using opensearch-py==1.0.0 to aws lambda
  2. Invoke bulk function of opensearch
  3. See error

Expected behavior The opensearch client should work as it was working fine with python3.6

Plugins opensearch-py==1.0.0

Screenshots Error screenshots

image image

Host/Environment (please complete the following information):

Additional context Add any other context about the problem here.

jasongilman commented 2 years ago

I'm also seeing this error with Python 3.8

[ERROR] OSError: [Errno 38] Function not implemented
Traceback (most recent call last):
....
  File "/var/task/opensearchpy/helpers/actions.py", line 469, in parallel_bulk
    pool = BlockingPool(thread_count)
  File "/var/lang/lib/python3.8/multiprocessing/pool.py", line 925, in __init__
    Pool.__init__(self, processes, initializer, initargs)
  File "/var/lang/lib/python3.8/multiprocessing/pool.py", line 196, in __init__
    self._change_notifier = self._ctx.SimpleQueue()
  File "/var/lang/lib/python3.8/multiprocessing/context.py", line 113, in SimpleQueue
    return SimpleQueue(ctx=self.get_context())
  File "/var/lang/lib/python3.8/multiprocessing/queues.py", line 336, in __init__
    self._rlock = ctx.Lock()
  File "/var/lang/lib/python3.8/multiprocessing/context.py", line 68, in Lock
    return Lock(ctx=self.get_context())
  File "/var/lang/lib/python3.8/multiprocessing/synchronize.py", line 162, in __init__
    SemLock.__init__(self, SEMAPHORE, 1, 1, ctx=ctx)
  File "/var/lang/lib/python3.8/multiprocessing/synchronize.py", line 57, in __init__
    sl = self._semlock = _multiprocessing.SemLock(
wbeckler commented 2 years ago

@jasongilman Did you get this error in a lambda or elsewhere?

jasongilman commented 2 years ago

@wbeckler It was in a lambda.

Aarif1430 commented 2 years ago

@jasongilman Yes it was in aws lambda.

wbeckler commented 2 years ago

Is anyone up for contributing a patch that addresses this issue when /dev/shm isn't available? There's a potential drop in replacement for the multiprocessing library: https://pypi.org/project/lambda-multiprocessing/

dblock commented 1 year ago

At a high level, is this issue about adding Python 3.9 support (starting with CI)?

saimedhi commented 1 year ago

@Aarif1430 @jasongilman Is the bug still persisting?

dblock commented 1 year ago

CI with Python 3.9 was added in https://github.com/opensearch-project/opensearch-py/pull/336 and it currently passes. We need a test that reproduces this problem.

samuelc-tm commented 12 months ago

I'm able the reproduce the issue:

Create lambda with python3.9:

import json
from multiprocessing.pool import ThreadPool

def lambda_handler(event, context):
    print("Hello")
    pool = ThreadPool()
    return {
        'statusCode': 200,
        'body': json.dumps('Hello from Lambda!')
    }

Give error

{
  "errorMessage": "[Errno 38] Function not implemented",
  "errorType": "OSError",
  "stackTrace": [
    "  File \"/var/task/lambda_function.py\", line 6, in lambda_handler\n    pool = ThreadPool()\n",
    "  File \"/var/lang/lib/python3.9/multiprocessing/pool.py\", line 927, in __init__\n    Pool.__init__(self, processes, initializer, initargs)\n",
    "  File \"/var/lang/lib/python3.9/multiprocessing/pool.py\", line 196, in __init__\n    self._change_notifier = self._ctx.SimpleQueue()\n",
    "  File \"/var/lang/lib/python3.9/multiprocessing/context.py\", line 113, in SimpleQueue\n    return SimpleQueue(ctx=self.get_context())\n",
    "  File \"/var/lang/lib/python3.9/multiprocessing/queues.py\", line 341, in __init__\n    self._rlock = ctx.Lock()\n",
    "  File \"/var/lang/lib/python3.9/multiprocessing/context.py\", line 68, in Lock\n    return Lock(ctx=self.get_context())\n",
    "  File \"/var/lang/lib/python3.9/multiprocessing/synchronize.py\", line 162, in __init__\n    SemLock.__init__(self, SEMAPHORE, 1, 1, ctx=ctx)\n",
    "  File \"/var/lang/lib/python3.9/multiprocessing/synchronize.py\", line 57, in __init__\n    sl = self._semlock = _multiprocessing.SemLock(\n"
  ]
}
dblock commented 12 months ago

Looking at https://pypi.org/project/lambda-thread-pool/

You cannot use "multiprocessing.Queue" or "multiprocessing.Pool" within a Python Lambda environment because the Python Lambda execution environment does not support shared memory for processes.

This means we need to get rid of or be able to swap ThreadPool with LambdaThreadPool in https://github.com/opensearch-project/opensearch-py/blob/da436cbbe8dda34abd607f527d4f0bdacb9b30d8/opensearchpy/helpers/actions.py#L470.

For an immediate workaround you can copy-paste the parallel_bulk implementation and replace BlockingPool with LambdaThreadPool and see if that works. For something maintainable, I would extract BlockingPool from this implementation by adding an abstract thread pool interface, implement another one for LambdaThreadPool and add a configuration parameter to specify which thread pool to use. Anyone wants to give either a try?

dblock commented 12 months ago

I renamed this to "parallel_bulk doesn't work in AWS lambda", is there anything else that doesn't?

samuelc-tm commented 12 months ago

Thank you, in my case the ThreadPool is used by some sdk and it wouldn't be ideal to change. We started getting the issue when upgrading from python3.7 to 3.9. We might just find an alternative solution instead of using the sdk.