langchain-ai / langchain

🦜🔗 Build context-aware reasoning applications
https://python.langchain.com
MIT License
95.23k stars 15.45k forks source link

Issue: Pinecone.from_documents cannot run on AWS Lambda because of multiprocessing #11168

Closed KLia closed 7 months ago

KLia commented 1 year ago

Issue you'd like to raise.

I am writing a few apps that use LangChain and deploying them to AWS Lambda. My previous 2 lambdas are running smoothly, but the one I'm trying to deploy this week is giving me the following stack trace when the lambda is run:

  "errorType": "OSError",
  "requestId": "23ef7a93-3d00-475c-b495-421810f93d92",
  "stackTrace": [
    "  File \"/var/task/datadog_lambda/wrapper.py\", line 214, in __call__\n    self.response = self.func(event, context, **kwargs)\n",
    "  File \"/var/task/app.py\", line 4, in lambda_handler\n    total_indexed_pages = get_latest_changes()\n",
    "  File \"/var/task/ddtrace/contrib/aws_lambda/patch.py\", line 115, in __call__\n    self.response = self.func(*args, **kwargs)\n",
    "  File \"/var/task/app.py\", line 4, in lambda_handler\n    total_indexed_pages = get_latest_changes()\n",
    "  File \"/var/task/index_latest_docs.py\", line 137, in get_latest_changes\n    raise e\n",
    "  File \"/var/task/index_latest_docs.py\", line 126, in get_latest_changes\n    Pinecone.from_documents(docs, embedding=embeddings, index_name=PINECONE_INDEX_NAME, namespace=PINECONE_NAMESPACE)\n",
    "  File \"/var/task/langchain/schema/vectorstore.py\", line 420, in from_documents\n    return cls.from_texts(texts, embedding, metadatas=metadatas, **kwargs)\n",
    "  File \"/var/task/langchain/vectorstores/pinecone.py\", line 416, in from_texts\n    pinecone.add_texts(\n",
    "  File \"/var/task/langchain/vectorstores/pinecone.py\", line 138, in add_texts\n    async_res = [\n",
    "  File \"/var/task/langchain/vectorstores/pinecone.py\", line 139, in <listcomp>\n    self._index.upsert(\n",
    "  File \"/var/task/pinecone/core/utils/error_handling.py\", line 17, in inner_func\n    return func(*args, **kwargs)\n",
    "  File \"/var/task/pinecone/index.py\", line 150, in upsert\n    return self._upsert_batch(vectors, namespace, _check_type, **kwargs)\n",
    "  File \"/var/task/pinecone/index.py\", line 237, in _upsert_batch\n    return self._vector_api.upsert(\n",
    "  File \"/var/task/pinecone/core/client/api_client.py\", line 776, in __call__\n    return self.callable(self, *args, **kwargs)\n",
    "  File \"/var/task/pinecone/core/client/api/vector_operations_api.py\", line 956, in __upsert\n    return self.call_with_http_info(**kwargs)\n",
    "  File \"/var/task/pinecone/core/client/api_client.py\", line 838, in call_with_http_info\n    return self.api_client.call_api(\n",
    "  File \"/var/task/pinecone/core/client/api_client.py\", line 421, in call_api\n    return self.pool.apply_async(self.__call_api, (resource_path,\n",
    "  File \"/var/task/pinecone/core/client/api_client.py\", line 107, in pool\n    self._pool = ThreadPool(self.pool_threads)\n",
    "  File \"/var/lang/lib/python3.11/multiprocessing/pool.py\", line 930, in __init__\n    Pool.__init__(self, processes, initializer, initargs)\n",
    "  File \"/var/lang/lib/python3.11/multiprocessing/pool.py\", line 196, in __init__\n    self._change_notifier = self._ctx.SimpleQueue()\n",
    "  File \"/var/lang/lib/python3.11/multiprocessing/context.py\", line 113, in SimpleQueue\n    return SimpleQueue(ctx=self.get_context())\n",
    "  File \"/var/lang/lib/python3.11/multiprocessing/queues.py\", line 341, in __init__\n    self._rlock = ctx.Lock()\n",
    "  File \"/var/lang/lib/python3.11/multiprocessing/context.py\", line 68, in Lock\n    return Lock(ctx=self.get_context())\n",
    "  File \"/var/lang/lib/python3.11/multiprocessing/synchronize.py\", line 167, in __init__\n    SemLock.__init__(self, SEMAPHORE, 1, 1, ctx=ctx)\n",
    "  File \"/var/lang/lib/python3.11/multiprocessing/synchronize.py\", line 57, in __init__\n    sl = self._semlock = _multiprocessing.SemLock

On line 126 in my index_latest_docs.py file, I have the following line: Pinecone.from_documents(docs, embedding=embeddings, index_name=PINECONE_INDEX_NAME, namespace=PINECONE_NAMESPACE)

This is causing it to not run in AWS Lambda because Lambda cannot handle multiprocessing, which the function seems to be doing internally. I don't know if this is a newly introduced issue, because I'm using the same exact function in my other lambda apps, and they seem to be running fine - but maybe the containers were built with a different version than this more recent one.

Any help would be greatly appreciated

Suggestion:

No response

dosubot[bot] commented 1 year ago

🤖

Hello,

Thank you for bringing this issue to our attention. The error you're encountering is likely due to the fact that AWS Lambda does not support multiprocessing in the same way a traditional operating system does. Each invocation of a Lambda function runs in its own isolated environment, meaning that processes cannot be shared between invocations, and each invocation can only use a single core.

The Pinecone.from_documents function you're using internally calls the Pinecone.add_texts method, which in turn uses the pinecone.Index.upsert method asynchronously. This could be the source of the multiprocessing issue.

To resolve this issue, you could modify the Pinecone.add_texts method to use synchronous requests instead of asynchronous ones. This would involve changing the async_req=True parameter to async_req=False in the self._index.upsert call. Here's how you could modify the method:

def add_texts(
    self,
    texts: Iterable[str],
    metadatas: Optional[List[dict]] = None,
    ids: Optional[List[str]] = None,
    namespace: Optional[str] = None,
    batch_size: int = 32,
    embedding_chunk_size: int = 1000,
    **kwargs: Any,
) -> List[str]:
    # ... existing code ...

    for i in range(0, len(texts), embedding_chunk_size):
        chunk_texts = texts[i : i + embedding_chunk_size]
        chunk_ids = ids[i : i + embedding_chunk_size]
        chunk_metadatas = metadatas[i : i + embedding_chunk_size]
        embeddings = self._embed_documents(chunk_texts)
        for batch in batch_iterate(
            batch_size, zip(chunk_ids, embeddings, chunk_metadatas)
        ):
            self._index.upsert(
                vectors=batch,
                namespace=namespace,
                async_req=False,  # Changed from True to False
                **kwargs,
            )

    return ids

Please note that this change will make the add_texts method run slower, as it will now wait for each request to complete before sending the next one. However, it should resolve the multiprocessing issue on AWS Lambda.

I hope this helps! If you have any other questions or need further clarification, please don't hesitate to ask.

Best, Dosu

Sources


This response is meant to be useful and save you time. It is not meant to be a precise solution, but rather a starting point for your own research.

Help me be more useful! Please leave a 👍 if this is helpful and 👎 if it is irrelevant.

If you want to continue the conversation, start your reply with @dosu-bot.

KLia commented 1 year ago

I can't really change add_texts without modify Langchain's source code

KLia commented 1 year ago

I reverted to use version 0.0.275 like my previous apps are using, and now it runs smoothly on AWS Lambda again.

LukasFides commented 1 year ago

Hi @KLia , I ran into this issue as well and it is really annoying. Thank you for putting your solution out here, I think there will be many more people to stumble over this.

Versioning is really essential for langchain-based services in my experience. One note I want to add especially for the AWS world, I built AWS step functions with multiple lambdas in order to have stable versions and still the freedom of new langchain features, maybe someting to consider.

bgonzalezfractal commented 1 year ago

@KLia I'm using Langchain 0.0.275 and still getting the same error, which specific version of pinecone-client are you using?

KLia commented 1 year ago

langchain==0.0.275

This is straight from my requirements.txt file. What error do you get?

bgonzalezfractal commented 1 year ago

I meant your pinecone-client version, maybe that is where my compatibility breaks.

Traceback (most recent call last):
  File "/function/data_processing/AgentProcessor.py", line 297, in process_docs_file_and_upload
    response, self.search_ids = self.vector_db_processor.upload_pinecone_pt1(labels=labels)
  File "/function/data_processing/VectorProcessor.py", line 95, in upload_pinecone_pt1
    async_results.append(index.upsert(
  File "/usr/local/lib/python3.9/site-packages/pinecone/core/utils/error_handling.py", line 17, in inner_func
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.9/site-packages/pinecone/index.py", line 117, in upsert
    return self._upsert_batch(vectors, namespace, _check_type, **kwargs)
  File "/usr/local/lib/python3.9/site-packages/pinecone/index.py", line 148, in _upsert_batch
    return self._vector_api.upsert(
  File "/usr/local/lib/python3.9/site-packages/pinecone/core/client/api_client.py", line 776, in __call__
    return self.callable(self, *args, **kwargs)
  File "/usr/local/lib/python3.9/site-packages/pinecone/core/client/api/vector_operations_api.py", line 956, in __upsert
    return self.call_with_http_info(**kwargs)
  File "/usr/local/lib/python3.9/site-packages/pinecone/core/client/api_client.py", line 838, in call_with_http_info
    return self.api_client.call_api(
  File "/usr/local/lib/python3.9/site-packages/pinecone/core/client/api_client.py", line 421, in call_api
    return self.pool.apply_async(self.__call_api, (resource_path,
  File "/usr/local/lib/python3.9/site-packages/pinecone/core/client/api_client.py", line 107, in pool
    self._pool = ThreadPool(self.pool_threads)
  File "/usr/local/lib/python3.9/multiprocessing/pool.py", line 927, in __init__
    Pool.__init__(self, processes, initializer, initargs)
  File "/usr/local/lib/python3.9/multiprocessing/pool.py", line 196, in __init__
    self._change_notifier = self._ctx.SimpleQueue()
  File "/usr/local/lib/python3.9/multiprocessing/context.py", line 113, in SimpleQueue
    return SimpleQueue(ctx=self.get_context())
  File "/usr/local/lib/python3.9/multiprocessing/queues.py", line 341, in __init__
    self._rlock = ctx.Lock()
  File "/usr/local/lib/python3.9/multiprocessing/context.py", line 68, in Lock
    return Lock(ctx=self.get_context())
  File "/usr/local/lib/python3.9/multiprocessing/synchronize.py", line 162, in __init__
    SemLock.__init__(self, SEMAPHORE, 1, 1, ctx=ctx)
  File "/usr/local/lib/python3.9/multiprocessing/synchronize.py", line 57, in __init__
    sl = self._semlock = _multiprocessing.SemLock(
OSError: [Errno 38] Function not implemented

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/function/data_processing/AgentProcessor.py", line 209, in process_files
    s3_path, search_id = self.process_docs_file_and_upload(filepath, filename)
  File "/function/data_processing/AgentProcessor.py", line 313, in process_docs_file_and_upload
    raise Exception(e_process_docs_file)
Exception: [Errno 38] Function not implemented
KLia commented 1 year ago

Oh, sorry, I misread what you wrote. I'm not specifying any particular version for the pinecone-client library. It's just using the latest. Running the version command within the container, it looks like it's using 2.2.4.

MikeWklm commented 1 year ago

Try using pool_threads=1 as a keyword argument:

Pinecone.from_documents(docs, embedding=embeddings, index_name=PINECONE_INDEX_NAME, namespace=PINECONE_NAMESPACE, pool_threads=1)

This will spawn a ThreadPool of size 1. If lambda does not like that you need to open an issue in the pinecone python api repo and ask to not spawn a pool if the number of threads is 1.

suresha97 commented 11 months ago

Hello, in case it's helpful to anyone else, manually setting async_req=False in the upsert() method solved the issue for me, although I also had to remove this line https://github.com/langchain-ai/langchain/blob/c0f4b95aa9961724ab4569049b4c3bc12ebbacfc/libs/langchain/langchain/vectorstores/pinecone.py#L149, so that it's not trying to parse the result of the async request. I only make the change to the version of the dependency that will be zipped and uploaded (using SAM) i.e. in a separate location to site-packages, which means I don't need to alter the langchain source code in my active development environment.

Its not pretty, but the only way I can currently see of simultaneously using the latest version of langchain and getting this functionality to work on Lambda.

Ideally, the async_req parameter is parsed from kwargs instead so that it works in all cases, but I guess this change would also need to come with very clear documentation alerting the user to the fact that the requests are not async by default.

nelsonauner commented 9 months ago

I opened a PR that addresses this issue, but the PR was closed by the maintainers without comment . It looks like the maintainers are overhauling the pinecone file structure in https://github.com/langchain-ai/langchain/pull/16556/files, so I've asked them there to address this.

wcheek commented 8 months ago

I'm seeing the same thing I think but the error is FileNotFound

[ERROR] FileNotFoundError: [Errno 2] No such file or directory
Traceback (most recent call last):
  File "/var/lang/lib/python3.12/site-packages/aws_lambda_powertools/logging/logger.py", line 447, in decorate
    return lambda_handler(event, context, *args, **kwargs)
  File "/var/task/main.py", line 87, in lambda_handler
    Pinecone.from_documents(docs, embeddings, index_name="chatbot-vectors")
  File "/var/lang/lib/python3.12/site-packages/langchain_core/vectorstores.py", line 508, in from_documents
    return cls.from_texts(texts, embedding, metadatas=metadatas, **kwargs)
  File "/var/lang/lib/python3.12/site-packages/langchain_pinecone/vectorstores.py", line 438, in from_texts
    pinecone.add_texts(
  File "/var/lang/lib/python3.12/site-packages/langchain_pinecone/vectorstores.py", line 159, in add_texts
    self._index.upsert(
  File "/var/lang/lib/python3.12/site-packages/pinecone/utils/error_handling.py", line 10, in inner_func
    return func(*args, **kwargs)
  File "/var/lang/lib/python3.12/site-packages/pinecone/data/index.py", line 168, in upsert
    return self._upsert_batch(vectors, namespace, _check_type, **kwargs)
  File "/var/lang/lib/python3.12/site-packages/pinecone/data/index.py", line 189, in _upsert_batch
    return self._vector_api.upsert(
  File "/var/lang/lib/python3.12/site-packages/pinecone/core/client/api_client.py", line 772, in __call__
    return self.callable(self, *args, **kwargs)
  File "/var/lang/lib/python3.12/site-packages/pinecone/core/client/api/data_plane_api.py", line 1084, in __upsert
    return self.call_with_http_info(**kwargs)
  File "/var/lang/lib/python3.12/site-packages/pinecone/core/client/api_client.py", line 834, in call_with_http_info
    return self.api_client.call_api(
  File "/var/lang/lib/python3.12/site-packages/pinecone/core/client/api_client.py", line 417, in call_api
    return self.pool.apply_async(self.__call_api, (resource_path,
  File "/var/lang/lib/python3.12/site-packages/pinecone/core/client/api_client.py", line 103, in pool
    self._pool = ThreadPool(self.pool_threads)
  File "/var/lang/lib/python3.12/multiprocessing/pool.py", line 930, in __init__
    Pool.__init__(self, processes, initializer, initargs)
  File "/var/lang/lib/python3.12/multiprocessing/pool.py", line 196, in __init__
    self._change_notifier = self._ctx.SimpleQueue()
  File "/var/lang/lib/python3.12/multiprocessing/context.py", line 113, in SimpleQueue
    return SimpleQueue(ctx=self.get_context())
  File "/var/lang/lib/python3.12/multiprocessing/queues.py", line 363, in __init__
    self._rlock = ctx.Lock()
  File "/var/lang/lib/python3.12/multiprocessing/context.py", line 68, in Lock
    return Lock(ctx=self.get_context())
  File "/var/lang/lib/python3.12/multiprocessing/synchronize.py", line 169, in __init__
    SemLock.__init__(self, SEMAPHORE, 1, 1, ctx=ctx)
  File "/var/lang/lib/python3.12/multiprocessing/synchronize.py", line 57, in __init__
    sl = self._semlock = _multiprocessing.SemLock(

This issue is preventing me from using Pinecone! Please fix it.

nelsonauner commented 8 months ago

@wcheek The issue I described should EDIT (not -> now) be fixable by using the async_req=False described in https://github.com/langchain-ai/langchain/pull/16753

baskaryan commented 7 months ago

@wcheek The issue I described should not be fixable by using the async_req=False described in #16753

@nelsonauner believe you meant to say now instead of not :) closing per ^