marqo-ai / marqo

Unified embedding generation and search engine. Also available on cloud - cloud.marqo.ai
https://www.marqo.ai/
Apache License 2.0
4.61k stars 189 forks source link

[BUG] Model Cache / vectorise error when client parallel indexing #519

Open vicilliar opened 1 year ago

vicilliar commented 1 year ago

Describe the bug A 500 error is received a few minutes through trying to index the simplewiki dataset with the hf/e5-base model with multiple client threads.

Error Messages From the client terminal:

Reprint test parameters: Namespace(ef_construction=128, m=16, dataset='simplewiki', model='hf/e5-base', device='cuda', os_nodes=3, marqo_ip='34.201.146.141', marqo_tag='0.0.21')
Indexing finished. Total indexing time: 41.96356773376465
Final index stats: {'numberOfDocuments': 487}
MarqoWebError: MarqoWebError Error message: {'message': "Request rejected, as this request attempted to update the model cache, while another request was updating the model cache at the same time.\n Please wait for 10 seconds and send the request again.\n Marqo's documentation can be found here: `https://docs.marqo.ai/latest/`", 'code': 'model_cache_management_error', 'type': 'invalid_request', 'link': None}
status_code: 409, type: invalid_request, code: model_cache_management_error, link:
MarqoWebError: MarqoWebError Error message: {'message': "Request rejected, as this request attempted to update the model cache, while another request was updating the model cache at the same time.\n Please wait for 10 seconds and send the request again.\n Marqo's documentation can be found here: `https://docs.marqo.ai/latest/`", 'code': 'model_cache_management_error', 'type': 'invalid_request', 'link': None}
status_code: 409, type: invalid_request, code: model_cache_management_error, link:
MarqoWebError: MarqoWebError Error message: {'message': "Request rejected, as this request attempted to update the model cache, while another request was updating the model cache at the same time.\n Please wait for 10 seconds and send the request again.\n Marqo's documentation can be found here: `https://docs.marqo.ai/latest/`", 'code': 'model_cache_management_error', 'type': 'invalid_request', 'link': None}
status_code: 409, type: invalid_request, code: model_cache_management_error, link:
MarqoWebError: MarqoWebError Error message: {'message': "Request rejected, as this request attempted to update the model cache, while another request was updating the model cache at the same time.\n Please wait for 10 seconds and send the request again.\n Marqo's documentation can be found here: `https://docs.marqo.ai/latest/`", 'code': 'model_cache_management_error', 'type': 'invalid_request', 'link': None}
status_code: 409, type: invalid_request, code: model_cache_management_error, link:
MarqoWebError: MarqoWebError Error message: {'message': "Request rejected, as this request attempted to update the model cache, while another request was updating the model cache at the same time.\n Please wait for 10 seconds and send the request again.\n Marqo's documentation can be found here: `https://docs.marqo.ai/latest/`", 'code': 'model_cache_management_error', 'type': 'invalid_request', 'link': None}
status_code: 409, type: invalid_request, code: model_cache_management_error, link:
MarqoWebError: MarqoWebError Error message: {'message': "Request rejected, as this request attempted to update the model cache, while another request was updating the model cache at the same time.\n Please wait for 10 seconds and send the request again.\n Marqo's documentation can be found here: `https://docs.marqo.ai/latest/`", 'code': 'model_cache_management_error', 'type': 'invalid_request', 'link': None}
status_code: 409, type: invalid_request, code: model_cache_management_error, link:
MarqoWebError: MarqoWebError Error message: Internal Server Error
status_code: 500, type: unhandled_error_type, code: unhandled_error, link:
sh-4.2$ python3

From the marqo terminal:

INFO:     34.230.156.67:4595 - "POST /indexes/recall-marqo0.0.21-simplewiki-hf.e5-base-m16-ef128/documents?refresh=false&device=cuda&use_existing_tensors=false&non_tensor_fields=docDate&non_tensor_fields=domain&non_tensor_fields=url HTTP/1.1" 500 Internal Server Error
ERROR:    Exception in ASGI application
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/dist-packages/uvicorn/protocols/http/httptools_impl.py", line 435, in run_asgi
    result = await app(  # type: ignore[func-returns-value]
  File "/usr/local/lib/python3.8/dist-packages/uvicorn/middleware/proxy_headers.py", line 78, in __call__
    return await self.app(scope, receive, send)
  File "/usr/local/lib/python3.8/dist-packages/fastapi/applications.py", line 270, in __call__
    await super().__call__(scope, receive, send)
  File "/usr/local/lib/python3.8/dist-packages/starlette/applications.py", line 124, in __call__
    await self.middleware_stack(scope, receive, send)
  File "/usr/local/lib/python3.8/dist-packages/starlette/middleware/errors.py", line 184, in __call__
    raise exc
  File "/usr/local/lib/python3.8/dist-packages/starlette/middleware/errors.py", line 162, in __call__
    await self.app(scope, receive, _send)
  File "/usr/local/lib/python3.8/dist-packages/starlette/middleware/exceptions.py", line 75, in __call__
    raise exc
  File "/usr/local/lib/python3.8/dist-packages/starlette/middleware/exceptions.py", line 64, in __call__
    await self.app(scope, receive, sender)
  File "/usr/local/lib/python3.8/dist-packages/fastapi/middleware/asyncexitstack.py", line 21, in __call__
    raise e
  File "/usr/local/lib/python3.8/dist-packages/fastapi/middleware/asyncexitstack.py", line 18, in __call__
    await self.app(scope, receive, send)
  File "/usr/local/lib/python3.8/dist-packages/starlette/routing.py", line 680, in __call__
    await route.handle(scope, receive, send)
  File "/usr/local/lib/python3.8/dist-packages/starlette/routing.py", line 275, in handle
    await self.app(scope, receive, send)
  File "/usr/local/lib/python3.8/dist-packages/starlette/routing.py", line 65, in app
    response = await func(request)
  File "/usr/local/lib/python3.8/dist-packages/fastapi/routing.py", line 235, in app
    raw_response = await run_endpoint_function(
  File "/usr/local/lib/python3.8/dist-packages/fastapi/routing.py", line 163, in run_endpoint_function
    return await run_in_threadpool(dependant.call, **values)
  File "/usr/local/lib/python3.8/dist-packages/starlette/concurrency.py", line 41, in run_in_threadpool
    return await anyio.to_thread.run_sync(func, *args)
  File "/usr/local/lib/python3.8/dist-packages/anyio/to_thread.py", line 31, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(
  File "/usr/local/lib/python3.8/dist-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
    return await future
  File "/usr/local/lib/python3.8/dist-packages/anyio/_backends/_asyncio.py", line 867, in run
    result = context.run(func, *args)
  File "/app/src/marqo/tensor_search/throttling/redis_throttle.py", line 87, in wrapper
    raise e
  File "/app/src/marqo/tensor_search/throttling/redis_throttle.py", line 83, in wrapper
    result = function(*args, **kwargs)
  File "/app/src/marqo/tensor_search/api.py", line 187, in add_or_replace_documents
    return tensor_search.add_documents_orchestrator(
  File "/app/src/marqo/tensor_search/tensor_search.py", line 245, in add_documents_orchestrator
    return add_documents(config=config, add_docs_params=add_docs_params)
  File "/app/src/marqo/tensor_search/tensor_search.py", line 582, in add_documents
    vector_chunks = s2_inference.vectorise(
  File "/app/src/marqo/s2_inference/s2_inference.py", line 76, in vectorise
    raise RuntimeError(f"Vectorise created an empty list of batches! Content: {content}")
RuntimeError: Vectorise created an empty list of batches! Content: []

To Reproduce Steps to reproduce the behavior:

  1. Run a marqo instance (0.0.21 tag)

    • AMI: Deep Learning AMI Neuron PyTorch 1.13 (Ubuntu 20.04) 20230613 Quickstart one: ami-000c1abd6dad09158 TCP Port 8882 exposed g4dn.2xlarge 200 GiB gp3 Storage
  2. Run a managed OpenSearch instance

    • OpenSearch 2.5 1-AZ r6g.2xlarge.search 32 GiB Memory 3 nodes, 300 EBS storage per node Public access Access Policy - JSON configure Auto-Tune Off No Automatic software update
  3. Run a Sagemaker client

  4. Run the following multithreaded indexing script

    
    index_settings = {
    "index_defaults": {
        "treat_urls_and_pointers_as_images": False if args.dataset == "simplewiki" else True,
        "model": args.model,
        "normalize_embeddings": True,
    
        "ann_parameters" : {
            "space_type": "cosinesimil",
            "parameters": {
                "ef_construction": args.ef_construction,
                "m": args.m
            }
        }
    },
    "number_of_shards": args.os_nodes,
    "number_of_replicas": 0
    }

try: mq.create_index(index_name, settings_dict=index_settings) print(f"Successfully created index {index_name}!")

except Exception as e: print(f"ERROR CREATING INDEX: {e}") print(f"Index {index_name} already exists! Skipping creation.")

print("Confirming index settings are: ") pprint.pprint(mq.index(index_name).get_settings())

#####################################################

STEP 3. indexing with marqo

#####################################################

TODO: track add docs throughput

Add documents in parallel

def send_docs(docs, error_queue): try: responses = mq.index(index_name).add_documents(docs, device=args.device, client_batch_size=10, non_tensor_fields=["docDate", "domain", "url"]) except Exception as e: error_queue.put(e)

t0 = time.time()

THREAD_COUNT = 7 split_size = math.ceil(len(data)/THREAD_COUNT) splits = [data[i: i + split_size] for i in range(0, len(data), split_size)] allocation_dict = { i: splits[i] for i in range(THREAD_COUNT) }

error_queue = queue.Queue() threads = [threading.Thread(target=senddocs, args=(split, errorqueue)) for split in splits] for thread in threads: thread.start() for thread in threads: thread.join()



**Expected behavior**
This error should not occur.

**Additional context**
Add any other context about the problem here.
![image](https://github.com/marqo-ai/marqo/assets/26185909/66b72b9a-1c02-4737-8584-9046bd61bf1e)
![image](https://github.com/marqo-ai/marqo/assets/26185909/c9d3ce9f-eb62-4635-bd7d-94fac9e1377b)
![image](https://github.com/marqo-ai/marqo/assets/26185909/fe5088c8-43e8-4281-8791-649616a1934d)
vicilliar commented 1 year ago

Models endpoint after the bug occurred:

{'models': [{'model_name': 'hf/all_datasets_v4_MiniLM-L6', 'model_device': 'cpu'}, {'model_name': 'hf/all_datasets_v4_MiniLM-L6', 'model_device': 'cuda'}, {'model_name': 'ViT-L/14', 'model_device': 'cpu'}, {'model_name': 'ViT-L/14', 'model_device': 'cuda'}, {'model_name': 'hf/e5-base', 'model_device': 'cuda'}]}
vicilliar commented 1 year ago

500 error issue split into separate issue: https://github.com/marqo-ai/marqo/issues/520