pinecone-io / canopy

Retrieval Augmented Generation (RAG) framework and context engine powered by Pinecone
https://www.pinecone.io/
Apache License 2.0
964 stars 120 forks source link

[Feature] Improve bulk uploads performance and robustness #162

Open igiloh-pinecone opened 12 months ago

igiloh-pinecone commented 12 months ago

Is this your first time submitting a feature request?

Describe the feature

The current process for one-time upload of a large dataset currently relies on naive batching, without any performance optimizations. In addition, there is almost no robustness to failures, and almost any intermediate failure.

The current KnowledgeBase.upsert() method was designed to a small set of Documents - more suitable for "follow up" data updates.
We need to either refactor the existing method or add an additional bulk_upsert() method that would be both more performant and more robust for failures.

Describe alternatives you've considered

Further analysis is required. Some of the processing steps are CPU-bound, while others are natively async.
In addition, some of the steps could benefit from different batch sizes than other (e.g. embedding model calls) - so we might consider doing the processing in some sort of a producer-consumer pipeline.

Who will this benefit?

This would affect all users, especially new users or ones starting a new project.

Are you interested in contributing this feature?

No response

Anything else?

No response

cfossguy commented 12 months ago

I think this is a crucial enhancement. I'm currently attempting to upload ~5200 parquet files and the upsert failed after ingesting ~290K embeddings ~20% of the total document base. The upsert error trigger was an OpenAI rate limit. The following capabilities would be ideal:

  1. Allow user to set a generate embedding rate limit and make the default setting below OpenAI's default rate limit.
  2. Keep track of the last successful upsert and allow the user to re-start upsert from that point. This will allow the user to fix whatever caused the error and retry. It will also help offset re-running embeddings if a rate limit kills the upsert.
  3. Make the process much faster. I realize this is tough b/c OpenAI rate limits pose a challenge but I've use asyncio to generate ~150K OpenAI embeddings in ~20 minutes. And, pinecone supports a much faster ingest rate.
deviantony commented 10 months ago

Yep I reckon that is a bit of a problem right now when using Canopy. I've been experimenting with it to upsert data using it as a library and I always face the following issue at some point during my upsert process:

File /opt/homebrew/lib/python3.10/site-packages/pinecone/core/client/rest.py:228, in RESTClientObject.request(self, method, url, query_params, headers, body, post_params, _preload_content, _request_timeout)
    225         raise NotFoundException(http_resp=r)
    227     if 500 <= r.status <= 599:
--> 228         raise ServiceException(http_resp=r)
    230     raise ApiException(http_resp=r)
    232 return r

ServiceException: (500)
Reason: Internal Server Error
HTTP response headers: HTTPHeaderDict({'content-type': 'application/json', 'Content-Length': '150', 'x-pinecone-request-latency-ms': '7544', 'date': 'Mon, 11 Dec 2023 19:47:52 GMT', 'x-envoy-upstream-service-time': '90', 'server': 'envoy', 'Via': '1.1 google', 'Alt-Svc': 'h3=":443"; ma=2592000,h3-29=":443"; ma=2592000'})
HTTP response body: {"code":13,"message":"We were unable to process your request. If the problem persists, please contact us at https://support.pinecone.io/","details":[]}

I'm also casually getting this error:

File /opt/homebrew/lib/python3.10/site-packages/pinecone/index.py:277, in Index.upsert_from_dataframe(self, df, namespace, batch_size, show_progress)
    275 results = []
    276 for chunk in self._iter_dataframe(df, batch_size=batch_size):
--> 277     res = self.upsert(vectors=chunk, namespace=namespace)
    278     pbar.update(len(chunk))
    279     results.append(res)

File /opt/homebrew/lib/python3.10/site-packages/pinecone/core/utils/error_handling.py:25, in validate_and_convert_errors.<locals>.inner_func(*args, **kwargs)
     23         raise
     24 except ProtocolError as e:
---> 25     raise PineconeProtocolError(f'Failed to connect; did you specify the correct index name?') from e

PineconeProtocolError: Failed to connect; did you specify the correct index name?

This is pretty painful, especially since there is no easy way to resume the process at that stage.

igiloh-pinecone commented 10 months ago

@cfossguy @deviantony thank you both very much for your feedback!

We are aware that Canopy's current ETL process for bulk upsert is rather naive, making it slow and sometimes inefficient. We are working hard on an improved approach for that part of the system.

In the meantime, please note that you can still tweak the current behavior by controlling 3 different knobs (sadly all called batch_size, although referring to 3 different batches...):

  1. The --batch-size param of the canopy upsert command, which determined how many Documents to process each time (process in this sense is the full chunk->embed->upsert pipeline).
  2. The batch_size init argument of to KnowledgeBase class, which refers to how many vectors (==chunks) to include in each Pinecone upsert operation
  3. The batch_size argument of the RecordEncoder class, which specifies how many chunks to include in a single embedding API call.

We will work on making these more clearly named and well documented.

igiloh-pinecone commented 10 months ago

@deviantony We totally get the point about being able to save partial results, like the embedded chunk vectors, so you don't have to repeat the entire process. This is definitely part of our planned ETL improvements, as well as the ability to retry only the failed documents.

In the meantime - are you sure that the error you're getting is related to upsert rate or data size? That's not necessarily the error I would expect to see in that case.
Could you please try uploading a subset of your data, and\or lower the various batch_sizes I mentioned above, and see if this problem still persists?

deviantony commented 10 months ago

I've been using this approach to upsert data using canopy as a SDK (following https://github.com/pinecone-io/canopy/blob/main/examples/canopy-lib-quickstart.ipynb):

    for file_path in file_paths:
        print(f"Processing: {file_path}")

        data = pd.read_json(file_path, lines=True)

        # Convert the data to Canopy documents
        documents = []
        for _, row in data.iterrows():
            document_data = row.to_dict()

            metadata = document_data.get('metadata', {})
            filtered_metadata = {'title': metadata.get('title', '')}

            document = Document(
                id=str(uuid.uuid4()),
                text=document_data['text'],
                source=document_data['source'],
                metadata=filtered_metadata 
            )     

            documents.append(document)

        batch_size = 100
        for i in tqdm(range(0, len(documents), batch_size)):
            kb.upsert(documents[i: i+batch_size])

I've faced the errors above using a batch_size of 100 and a batch size of 10.

usamasaleem1 commented 10 months ago

Same here, definitely need faster upsert methods. A bulk upload is taking forever.

kowshik24 commented 5 months ago

While upserting bulk data check out this library it's easy to use: https://github.com/kowshik24/PineconeUtils/tree/main

and here is the PyPI repo: https://pypi.org/project/pineconeutils/

Thanks 😊