langchain-ai / langchain

🦜🔗 Build context-aware reasoning applications
https://python.langchain.com
MIT License
92.07k stars 14.66k forks source link

ainvoke is not asynchronous #24433

Open Tolkoton opened 1 month ago

Tolkoton commented 1 month ago

Checked other resources

Example Code

async def get_response(collection_name, user_input):
    rag_chain, retriever = await get_rag_chain(embeddings=EMBEDDINGS_MODEL, collection_name=collection_name)
    response = await rag_chain.ainvoke(user_input)
    response = response.content
    return response

async def process_user_question(update: Update, context: CallbackContext) -> int:
    user_input = update.message.text
    user_id = update.effective_user.id

    if user_input == "Назад":
        return await show_main_menu(update, context)

    await update.message.reply_text("Зачекайте, будь ласка, аналізую чинне законодавство..."
                                    "Підготовка відповіді може тривати кілька хвилин")

    collection_name = context.user_data.get('collection_name', 'default_collection')
    print(collection_name)

    response = await get_response(collection_name=collection_name, user_input=user_input)

    log_conversation(user_id=user_id, user_input=user_input, response=response)

    await update.message.reply_text(
        response + "\n\nВи можете задати ще одне питання або вибрати 'Назад', щоб повернутися до головного меню.",
        reply_markup=ReplyKeyboardMarkup([["Назад"]], one_time_keyboard=False))
    return USER_QUESTION

Error Message and Stack Trace (if applicable)

No response

Description

This code is working, but its not asyncronous. the single point that takes a lot of time in all the executions is this: response = await get_response(collection_name=collection_name, user_input=user_input) it blocks system for all other users, so the ainvoke must be not working as expected

System Info

System Information

OS: Windows OS Version: 10.0.22631 Python Version: 3.12.2 (tags/v3.12.2:6abddd9, Feb 6 2024, 21:26:36) [MSC v.1937 64 bit (AMD64)]

Package Information

langchain_core: 0.2.1 langchain: 0.2.1 langchain_community: 0.2.1 langsmith: 0.1.49 langchain_google_genai: 1.0.5 langchain_google_vertexai: 1.0.4 langchain_openai: 0.1.7 langchain_text_splitters: 0.2.0

Packages not installed (Not Necessarily a Problem)

The following packages were not found:

langgraph langserve

eyurtsev commented 1 month ago

Hi @Tolkoton, there's not enough context here. Whether it's synchronous or not depends on the implementation of get_rag_chain. What does get_rag_chain do?

If this is from existing LangChain code, would you be kind enough to add a link to?

Tolkoton commented 1 month ago
async def get_rag_chain(embeddings, collection_name):
    try:
        vector_store = ZillizDB.create_vector_store(embeddings=embeddings, collection_name=collection_name)
        print(f"Zilliz vector store created for collection: {collection_name}")
        retriever = vector_store.as_retriever()
        print("Retriever initialized.")

        template = """Дай обґрунтовану відповідь на питання користуючись контекстом."
        {context}
        Question: {question}
        Helpful Answer:"""

        model = HuggingFaceCrossEncoder(model_name="BAAI/bge-reranker-v2-m3")
        compressor = CrossEncoderReranker(model=model, top_n=15)
        compression_retriever = ContextualCompressionRetriever(base_compressor=compressor, base_retriever=retriever)

        rag_prompt = PromptTemplate.from_template(template)

        rag_chain = (
            {"context": compression_retriever, "question": RunnablePassthrough()}
            | rag_prompt
            | LLM)
        print("RAG chain set up.")

        return rag_chain, compression_retriever
    except Exception as e:
        print(f"Error setting up RAG chain: {e}")
        return None, None
pm390 commented 1 month ago

@Tolkoton , I see in your code above that you use LLM in your rag_chain, what specific LLM are you using, the behaviour you are seeing could be related to that specific LLM.

In particular if no _acall is implemented for the LLM a "naive run_in_executor" approach is used, in that scenario a blocking instruction in the _call method could be the cause of what you are seeing.

For further investigations I think that the LLM used is crucial.

If no _acall implementation is present this would be a feature request.

@eyurtsev please correct me if I'm wrong.

Tolkoton commented 1 month ago

@pm390 @eyurtsev I am using gpt4 turbo. But the most time is consumed in Milvus search and reranking, and its most likely problem there.

eyurtsev commented 1 month ago

What does this code do? Could you link to it? I can't find it in the code base, but it's also clearly not async. I'm guessing it makes network calls.

      vector_store = ZillizDB.create_vector_store(embeddings=embeddings, collection_name=collection_name)
Tolkoton commented 1 month ago

@eyurtsev @staticmethod def create_vector_store(embeddings, collection_name, index_params=INDEX_PARAMS, search_params=SEARCH_PARAMS): vector_store = Milvus( embedding_function=embeddings, connection_args={'uri': ZILLIZ_CLUSTER_ENDPOINT, 'token': ZILLIZ_TOKEN}, collection_name=collection_name, search_params=search_params, index_params=index_params) return vector_store

pm390 commented 1 month ago

@Tolkoton If that Milvus class you are using is this one, I can see that it instantiates Collection Object from pymilvus internally.

I tried digging into that object and saw that internally is initialized and it uses using grpc_handler (also this from pymilvus). Checking inside grpc_handler's methods I see that they perform calls using grcp stub and get a future from them, but in almost all methods they proceed to call .result() on that future, blocking execution there until a response is returned.

I'm not quite sure if this is the only reason, hope this helps further investigation.

I think also the problem could be present for later searching on the vector store, currently no custom implementation of the asynchronous methods is provided so the implementation of the base class is used (so just invoking the syncrhonous method using run_in_executor).

Some methods like search of pymilvus collections can be invoked asynchronously by passing _async=True as a kwarg to return a concurrent.futures.Future object instead of making blocking calls.

Tolkoton commented 1 month ago

Milvus is not supporting asynchronous calls.