weaviate / weaviate-python-client

A python native client for easy interaction with a Weaviate instance.
https://weaviate.io/developers/weaviate/current/client-libraries/python.html
BSD 3-Clause "New" or "Revised" License
162 stars 76 forks source link

Problem upgrading to version higher than 4.6.7 on AWS ECS Cluster #1309

Closed gariel closed 1 week ago

gariel commented 1 month ago

Hello,

This week we tried to upgrade the version of the Weaviate python client to 4.8.0 as we wanted to try the support for multi vector search. The code runs fine locally but when running in a task on AWS ECS we had problems with the connection, any request (queries, client.is_connected(), client.get_meta(), etc) stucks and we get timeout. We can see in logs that the client initialization is successful and were able to hit the /meta endpoint (also on our debug code it worked)

We also tried on version 4.7.1 with no luck

To be able to debug, I created the following FastAPI controller:

import logging
from pydantic import BaseModel
from fastapi import APIRouter
from fastapi.responses import JSONResponse
import weaviate
from weaviate.classes.query import Filter

logger = logging.getLogger(__name__)
router = APIRouter(prefix="/ui")

class StatusBody(BaseModel):
    url: str
    key: str
    collection: str
    field: str
    value: str

@router.post("/status", response_class=JSONResponse)
async def status(body: StatusBody):
    logger.info("Status endpoint")

    with weaviate.connect_to_wcs(
        cluster_url=body.url,
        auth_credentials=weaviate.auth.AuthApiKey(body.key),
        additional_config=weaviate.classes.init.AdditionalConfig(timeout=weaviate.classes.init.Timeout(init=10)),
        skip_init_checks=True,
    ) as client:

        logger.info("Status got client")

        result = client.collections.get(body.collection).query.fetch_objects(
            filters=Filter.by_property(body.field).equal(body.value),
            limit=1
        )

        logger.info("Status got response")

        data = result.objects[0].properties
        logger.info(f"Status: {data}")

        return data

When running it we saw the logs for "Status endpoint" and "Status got client" but never "Status got response". Running locally worked fine

tsmith023 commented 1 month ago

Hi @gariel, thanks for raising this! As you say, the code works locally and I see the same with the tests in the repo and also running your snippet locally with some mock data. So my guess is there may be a communication issue between the client in the ECS environment and your deployed Weaviate.

You have specified skip_init_checks=True, which bypasses a number of checks that guarantee the client can successfully connect to the server. What happens if you change this to False? Do you see any errors printed or do you end up in the same situation where the API route simple times out?

P.S. Are you able to experiment using the new async client in your ECS environment? You can do this simply with your code snippet by modifying as so:

import logging
from pydantic import BaseModel
from fastapi import APIRouter
from fastapi.responses import JSONResponse
import weaviate
from weaviate.classes.query import Filter

logger = logging.getLogger(__name__)
router = APIRouter(prefix="/ui")

class StatusBody(BaseModel):
    url: str
    key: str
    collection: str
    field: str
    value: str

@router.post("/status", response_class=JSONResponse)
async def status(body: StatusBody):
    logger.info("Status endpoint")

    async with weaviate.use_async_with_weaviate_cloud(
        cluster_url=body.url,
        auth_credentials=weaviate.auth.AuthApiKey(body.key),
        additional_config=weaviate.classes.init.AdditionalConfig(timeout=weaviate.classes.init.Timeout(init=10)),
        skip_init_checks=True,
    ) as client:

        logger.info("Status got client")

        result = await client.collections.get(body.collection).query.fetch_objects(
            filters=Filter.by_property(body.field).equal(body.value),
            limit=1
        )

        logger.info("Status got response")

        data = result.objects[0].properties
        logger.info(f"Status: {data}")

        return data

I'd be interested to know if this fixes things as it would indicate an issue with our new sync implementation running in deployed async environments!

gariel commented 1 month ago

Hi @tsmith023, I'm going to test these changes and send you the results

tsmith023 commented 1 month ago

@gariel, is this issue: https://github.com/weaviate/weaviate-python-client/issues/1292, potentially related to your deployment setup?

gariel commented 1 month ago

Hi @tsmith023, sorry I was unable to focus on this until now, and yes, we normally don't use gunicorn locally for development but I started the service using it locally and was able to reproduce the problem I also implemented you code for async usage but the same problem:

{"asctime": "2024-09-26 15:25:38,442", "levelname": "INFO", "message": "Status endpoint", "name": "routes.poc.ui", "process": 6705, "taskName": "starlette.middleware.base.BaseHTTPMiddleware.__call__.<locals>.call_next.<locals>.coro", "request_id": "5de542bb-1482-4245-bbda-c33c8f5cbffa"}
{"asctime": "2024-09-26 15:25:38,776", "levelname": "INFO", "message": "HTTP Request: GET https://<redacted>.gcp.weaviate.cloud/v1/meta \"HTTP/1.1 200 OK\"", "name": "httpx", "process": 6705, "taskName": "starlette.middleware.base.BaseHTTPMiddleware.__call__.<locals>.call_next.<locals>.coro", "request_id": "5de542bb-1482-4245-bbda-c33c8f5cbffa"}
{"asctime": "2024-09-26 15:25:38,779", "levelname": "INFO", "message": "Status got client", "name": "routes.poc.ui", "process": 6705, "taskName": "starlette.middleware.base.BaseHTTPMiddleware.__call__.<locals>.call_next.<locals>.coro", "request_id": "5de542bb-1482-4245-bbda-c33c8f5cbffa"}

I also removed the skip_init_checks=True, and the "Status got client" log also didn't show, it seems like it will keep hanging (no timeout or something), the curl request also keeps hanging

tsmith023 commented 1 month ago

Oh interesting! Just so I understand you correctly, you are able to reproduce the problem locally if you use gunicorn when deploying your API in a Docker compose stack or k8s cluster?

If so, could you please share the Dockerfile you are using to replicate the problem? I am using the following and am unable to reproduce the issue that you are describing:

import weaviate
from fastapi import FastAPI

app = FastAPI()

@app.get("/meta")
def get():
    with weaviate.connect_to_local(host="weaviate") as client:
        name = "Thing"
        if not client.collections.exists(name):
            collection = client.collections.create(name)
        else:
            collection = client.collections.get(name)
        return collection.query.fetch_objects()

and:

FROM python:3.12-slim

WORKDIR /app

RUN python3 -m venv venv

RUN ./venv/bin/pip install --upgrade pip
RUN ./venv/bin/pip install fastapi
RUN ./venv/bin/pip install uvicorn
RUN ./venv/bin/pip install weaviate-client
RUN ./venv/bin/pip install gunicorn

COPY ./app.py /app/app.py

CMD ["./venv/bin/gunicorn", "-w", "4", "-k", "uvicorn.workers.UvicornWorker", "app:app", "-b", "0.0.0.0:8000"]
gariel commented 1 month ago

I reproduced it running locally, just:

PYTHONPATH="src:$PYTHONPATH" exec gunicorn -c gunicorn_config.py main:app

our python files are inside the src dir

this is our gunicorn_config.py file:

import multiprocessing
import os
import sys

bind = "0.0.0.0:80"
workers = os.environ.get('WORKER_THREADS') or (multiprocessing.cpu_count() + 1)
worker_class = "uvicorn.workers.UvicornWorker"
timeout = 120

preload_app = True
accesslog = "-"
errorlog = "-"
max_requests = 2000
max_requests_jitter = 500

## just log configs, I removed as it should not be relevant
#logconfig_dict =

If you are not able to reproduce it I can create a small project here with this

Also, we are using datadog injection, I don't know if it may be relevant but DD monkey patch a lot of libraries/methods And we are using python 3.11 instead

tsmith023 commented 3 weeks ago

@gariel, I have tested your minimal setup locally:

In each case, I was unable to replicate the issue you're seeing! Now that you mention that you use DD injection that monkey-patches libraries, I think it's worthwhile to test out disabling it to determine whether it is implicated or not.

What's odd is that your client is created so the health-check must be succeeding yet when it comes to querying the data through the Search RPC method, it hangs. Can you also test by setting query=1 in your Timeout(init=10) config to see whether that helps at all?

gariel commented 3 weeks ago

hey @tsmith023 I was simplifying an example for sending you and got some points:

I created a repository with this example: https://github.com/gariel/weaviate_async_hang_example_project

but for the sake of keeping the history:

import logging
import os

from fastapi import FastAPI
from fastapi.responses import JSONResponse

from pydantic import BaseModel
import weaviate
from weaviate.classes.query import Filter

# some dependency instantiate a sync client on import/file root
_ = weaviate.connect_to_wcs(
    cluster_url=os.getenv("CLUSTER_URL"),
    auth_credentials=weaviate.auth.AuthApiKey(os.getenv("AUTH_API_KEY")),
    additional_config=weaviate.classes.init.AdditionalConfig(timeout=weaviate.classes.init.Timeout(init=10)),
)

app = FastAPI()
logger = logging.getLogger(__name__)

class StatusBody(BaseModel):
    url: str
    key: str
    collection: str
    field: str
    value: str

@app.post("/sync", response_class=JSONResponse)
async def status_sync(body: StatusBody):
    logger.info("Status endpoint")

    with weaviate.connect_to_wcs(
        cluster_url=body.url,
        auth_credentials=weaviate.auth.AuthApiKey(body.key),
        additional_config=weaviate.classes.init.AdditionalConfig(timeout=weaviate.classes.init.Timeout(init=10)),
    ) as client:

        logger.info("Status got client")

        result = client.collections.get(body.collection).query.fetch_objects(
            filters=Filter.by_property(body.field).equal(body.value),
            limit=1
        )

        logger.info("Status got response")

        data = result.objects[0].properties
        logger.info(f"Status: {data}")

        return data

@app.post("/async", response_class=JSONResponse)
async def status_async(body: StatusBody):
    logger.info("Status endpoint")

    async with weaviate.use_async_with_weaviate_cloud(
        cluster_url=body.url,
        auth_credentials=weaviate.auth.AuthApiKey(body.key),
        additional_config=weaviate.classes.init.AdditionalConfig(timeout=weaviate.classes.init.Timeout(init=10)),
    ) as client:

        logger.info("Status got client")

        result = await client.collections.get(body.collection).query.fetch_objects(
            filters=Filter.by_property(body.field).equal(body.value),
            limit=1
        )

        logger.info("Status got response")

        data = result.objects[0].properties
        logger.info(f"Status: {data}")

        return data

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=80)

The command line I used to start gunicorn:

gunicorn --bind=0.0.0.0:80 --workers=2 --worker-class "uvicorn.workers.UvicornWorker" --preload main:app

You need to have the ENVs CLUSTER_URL and AUTH_API_KEY in your shell

tsmith023 commented 1 week ago

Hi @gariel, thanks for all your help looking into this and providing the context 😁

Given your specific setup here, this is an unintended consequence of our async implementation. In short, the sync client spins up a side-car event-loop thread where it schedules coroutines to run and awaits their response effectively simulating blocking I/O. We did this as a transition period for users to migrate their APIs to the new async implementation without completely breaking everything, however your issue seems to be an edge case of this process!

The likely reason for this breaking your MRE above is that the event-loop thread is being spawned in a different Python process, due to how gunicorn works, and so any subsequent calls hang due to potentially undefined Pythonic behaviour between threads and processes

I will take your MRE and get it running locally to see if there's anything we can do to handle this edge case!

tsmith023 commented 1 week ago

I have some good news! I have a fix for this problem but it does require one change to your MRE in line with how the Python client should be used in production environments. In your MRE, you do:

_ = weaviate.connect_to_local(
    additional_config=weaviate.classes.init.AdditionalConfig(timeout=weaviate.classes.init.Timeout(init=10)),
)

without closing the client subsequently. This leads to hanging connections, which is implicated in the abnormal behaviour that we're seeing here. A more robust usage would be as so:

client = weaviate.connect_to_local(
    additional_config=weaviate.classes.init.AdditionalConfig(timeout=weaviate.classes.init.Timeout(init=10)),
)
do_stuff(client)
client.close()

With this change, and the fixes I've opened here, your use-case should be supported!