run-llama / llama_index

LlamaIndex is a data framework for your LLM applications
https://docs.llamaindex.ai
MIT License
36.32k stars 5.18k forks source link

[Question]: Deployment Failed: httpcore.ConnectError: All connection attempts failed #16426

Closed 202030481266 closed 2 weeks ago

202030481266 commented 2 weeks ago

Question Validation

Question

from llama_index.core.workflow import Event
from llama_index.core.schema import NodeWithScore
from llama_index.core import VectorStoreIndex
from llama_index.core.response_synthesizers import CompactAndRefine
from llama_deploy import (
    deploy_workflow,
    WorkflowServiceConfig,
    ControlPlaneConfig,
    SimpleMessageQueueConfig,
)
from llama_index.core.workflow import (
    Context,
    Workflow,
    StartEvent,
    StopEvent,
    step,
)
from custom_docx_parser import LlamaIndexDocxClient
import json
import os
import asyncio
from llama_index.core.schema import IndexNode, TextNode
from llama_index.llms.ollama import Ollama
from llama_index.embeddings.huggingface import HuggingFaceEmbedding
from llama_index.postprocessor.flag_embedding_reranker import FlagEmbeddingReranker
from llama_index.core import Settings
from llama_index.core import StorageContext
from llama_index.vector_stores.milvus import MilvusVectorStore
from FlagEmbedding import BGEM3FlagModel
from typing import List, Any
from llama_index.vector_stores.milvus.utils import BaseSparseEmbeddingFunction
from llama_index.core.schema import NodeWithScore
from llama_index.core.response.notebook_utils import display_source_node, display_response

import nest_asyncio

nest_asyncio.apply()

embed_model = HuggingFaceEmbedding(model_name='/zhangshuhai/work/embed/Xorbits/bge-m3', device="cuda:1")

llm = Ollama(
    model="qwen2.5:72b", 
    context_window=32768, 
    temperature=0.7,
    keep_alive="3h",
    request_timeout=400
)

rerank_top_5 = FlagEmbeddingReranker(model="/zhangshuhai/work/embed/AI-ModelScope/bge-reranker-v2-m3", top_n=5)
rerank_top_10 = FlagEmbeddingReranker(model="/zhangshuhai/work/embed/AI-ModelScope/bge-reranker-v2-m3", top_n=10)

# config the resource
Settings.llm = llm
Settings.embed_model = embed_model

class ProgressEvent(Event):
    progress: str

class RetrieverEvent(Event):
    """Result of running retrieval"""
    nodes: list[NodeWithScore]

class IngestEvent(Event):
    """Result of ingest process"""
    index_nodes: list[IndexNode]
    text_nodes: list[TextNode]

# * highly recommand don't use the loop unless you have the crucial requirements
class RerankEvent1(Event):
    """Result of running reranking on retrieved nodes"""
    nodes: list[NodeWithScore]

class RerankEvent2(Event):
    """Result of running reranking on retrieved nodes"""
    nodes: list[NodeWithScore]

class PostProcessorEvent(Event):
    """Result of post processor with custom methods"""
    nodes: list[NodeWithScore]

class LLMResponseEvent(Event):
    """Result of LLM generate"""
    response: Any

# custom sparse embedding function
class FlagSparseEmbeddingFunction(BaseSparseEmbeddingFunction):
    _instance = None  # singleton pattern, avoid huge gpu memory use

    def __new__(cls, *args, **kwargs):
        if cls._instance is None:
            cls._instance = super().__new__(cls, *args, **kwargs)
        return cls._instance

    def __init__(self):
        if not hasattr(self, 'initialized'): 
            self.model = BGEM3FlagModel("/zhangshuhai/work/embed/Xorbits/bge-m3", use_fp16=False)
            self.initialized = True

    def encode_queries(self, queries: List[str]):
        outputs = self.model.encode(
            queries,
            return_dense=False,
            return_sparse=True,
            return_colbert_vecs=False,
        )["lexical_weights"]
        return [self._to_standard_dict(output) for output in outputs]

    def encode_documents(self, documents: List[str]):
        outputs = self.model.encode(
            documents,
            return_dense=False,
            return_sparse=True,
            return_colbert_vecs=False,
        )["lexical_weights"]
        return [self._to_standard_dict(output) for output in outputs]

    def _to_standard_dict(self, raw_output):
        result = {}
        for k in raw_output:
            result[int(k)] = raw_output[k]
        return result

# milvus vector database setting
milvus_vector_store = MilvusVectorStore(
    dim=1024,
    overwrite=True,
    enable_sparse=True,
    sparse_embedding_function=FlagSparseEmbeddingFunction(),
    hybrid_ranker="RRFRanker",
    hybrid_ranker_params={"k": 60},
)
milvus_storage_context = StorageContext.from_defaults(vector_store=milvus_vector_store)

class HybridRAGWorkflow(Workflow):

    @step
    async def ingest_step_1(self, ctx: Context, ev: StartEvent) -> IngestEvent | None:
        """Entry point to ingest a document, triggered by a StartEvent with `dirname`."""
        dirname = ev.get("dirname")
        if not dirname:
            return None

        # stream events as steps run
        ctx.write_event_to_stream(
            ProgressEvent(progress="Start ingest the data!")
        )

        absolute_paths = []
        for root, dir, files in os.walk(dirname):
            for file in files:
                if file.lower().endswith('.docx'):
                    absolute_path = os.path.join(root, file)
                    absolute_paths.append(os.path.abspath(absolute_path))

        docx_client = LlamaIndexDocxClient(chunk_size=1024, docx_file_path='dirname')
        index_nodes = []
        text_nodes = []
        for path in absolute_paths:
            docx_client.clear_nodes()
            docx_client.docx_file_path = path
            docx_client.process_docx_to_nodes(use_reference=True)
            index_nodes.extend(docx_client.reference_nodes)
            text_nodes.extend(docx_client.fulltext_nodes)

        return IngestEvent(index_nodes=index_nodes, text_nodes=text_nodes)

    @step
    async def index_step_2(self, ctx: Context, ev: IngestEvent) -> StopEvent | None:
        """create index and index dicts"""
        index_nodes = ev.index_nodes
        text_nodes = ev.text_nodes
        if not index_nodes or not text_nodes:
            print("Index and text nodes is empty, please run the ingest function before indexing!")
            return None

        index_nodes_dict = {node.id_ : node for node in index_nodes}
        text_nodes_dict = {node.id_ : node for node in text_nodes}
        await ctx.set('index_nodes_dict', index_nodes_dict)
        await ctx.set('text_nodes_dict', text_nodes_dict)

        # use milvus vector store
        milvus_simple_vector_index = VectorStoreIndex(nodes=index_nodes, storage_context=milvus_storage_context)

        return StopEvent(result=milvus_simple_vector_index)

    @step
    async def retrieve_step_3(
        self, ctx: Context, ev: StartEvent
    ) -> RetrieverEvent | None:
        "Entry point for RAG, triggered by a StartEvent with `query`."
        query = ev.get("query")
        index = ev.get("index")

        if not query:
            return None

        print(f"Query the documents with: {query}")

        # store the query in the global context
        await ctx.set("query", query)

        if index is None:
            print("Index is empty, load some documents before querying!")
            return None

        # we assume that we use the milvus vector index and use the hybrid retrieve mode
        retriever = index.as_retriever(vector_store_query_mode="hybrid", similarity_top_k=20)
        nodes = await retriever.aretrieve(query)
        print(f"Retrieved {len(nodes)} nodes.")
        return RetrieverEvent(nodes=nodes)

    @step
    async def rerank_step_4(self, ctx: Context, ev: RetrieverEvent) -> RerankEvent1:
        # Rerank the nodes
        print(await ctx.get("query", default=None), flush=True)
        new_nodes = rerank_top_10.postprocess_nodes(
            ev.nodes, query_str=await ctx.get("query", default=None)
        )
        print(f"Reranked nodes to {len(new_nodes)}")
        return RerankEvent1(nodes=new_nodes)

    @step
    async def rerank_step_6(self, ctx: Context, ev: PostProcessorEvent) -> RerankEvent2:
        # Rerank the nodes
        print(await ctx.get("query", default=None), flush=True)
        new_nodes = rerank_top_10.postprocess_nodes(
            ev.nodes, query_str=await ctx.get("query", default=None)
        )
        print(f"Reranked nodes to {len(new_nodes)}")
        return RerankEvent2(nodes=new_nodes)

    @step
    async def add_context_postprocess_step_5(self, ctx: Context, ev: RerankEvent1) -> PostProcessorEvent | None:
        """replace the index node for parent chunk!"""
        is_add_context = await ctx.get('is_add_context', default=None)
        if is_add_context:
            return None # already done!
        text_node_dict = await ctx.get('text_nodes_dict', default=None)
        if not text_node_dict:
            print("Text nodes ID is lost! Please restart the workflow!")
            return None

        # set the is_add_context variable
        await ctx.set('is_add_context', True)

        nodes = ev.nodes
        new_nodes = []
        nodes_id = set([n.node.id_ for n in nodes])
        for n in nodes:
            # n is index node
            text_node_id = n.node.index_id
            if text_node_id not in nodes_id and text_node_id in text_node_dict:
                nodes_id.add(text_node_id)
                new_nodes.append(NodeWithScore(node=text_node_dict[text_node_id],score=n.score))
        return PostProcessorEvent(nodes=new_nodes)

    @step
    async def synthesize_step_7(self, ctx: Context, ev: RerankEvent2) -> LLMResponseEvent | None:
        """Return a streaming response using reranked nodes."""
        is_add_context = await ctx.get('is_add_context', default=None)
        if not is_add_context:
            return None
        summarizer = CompactAndRefine(llm=llm)
        query = await ctx.get("query", default=None)
        response = await summarizer.asynthesize(query, nodes=ev.nodes)
        return LLMResponseEvent(response=response)

    @step
    async def gather_step_8(self, ctx: Context, ev: LLMResponseEvent) -> StopEvent:
        """Gather all the answer"""
        responses = await ctx.collect_events(ev, [LLMResponseEvent])
        if responses is None:
            return None
        return StopEvent(result=responses)

# async def handle_request(query: str):
#     rag_flow = HybridRAGWorkflow(timeout=400)
#     handler = rag_flow.run(dirname='./data')
#     index = await handler 
#     await handler.ctx.set('is_add_context', False)
#     result = await rag_flow.run(query=query, index=index, ctx=handler.ctx)
#     async for chunk in result.async_response_gen():
#         print(chunk, end="", flush=True)

async def main():
    await deploy_workflow(
        workflow=HybridRAGWorkflow(timeout=400), # modify the time
        workflow_config=WorkflowServiceConfig(
            host="127.0.0.1", port=8002, service_name="my_workflow"
        ),
        control_plane_config=ControlPlaneConfig(),
    )

if __name__ == "__main__":
    import asyncio

    asyncio.run(main())

# ? only the StartEvent can use like ev.get('key') 
# ? the context object is not dict, so the get method is not like dict's get, `get` must be after `set`
# * should set the timeout more large, default is 10s!
# TODO use send_event() to activate multiple events

This is my deployment code. But it cause error:

httpcore.ConnectError: All connection attempts failed

How to fix it?

dosubot[bot] commented 2 weeks ago

The httpcore.ConnectError: All connection attempts failed error typically indicates a problem with connecting to the specified host or port. Here are some steps to troubleshoot and resolve the issue:

  1. Check Host and Port: Ensure that the host (127.0.0.1) and port (8002) specified in your WorkflowServiceConfig are correct and that the service is running on that address.

  2. Service Availability: Verify that the service you are trying to connect to is up and running. You can use tools like curl or telnet to check if the port is open and accepting connections.

  3. Firewall and Network Settings: Check if there are any firewall rules or network settings that might be blocking the connection to the specified host and port.

  4. Timeout Settings: Ensure that the timeout settings are sufficient. You have set a timeout of 400 seconds in your HybridRAGWorkflow, which should be adequate, but make sure this is correctly applied.

  5. Dependencies and Environment: Ensure all necessary dependencies are installed and that your environment is correctly set up. This includes verifying that the llama_index and related packages are properly installed.

  6. Logs and Debugging: Check any available logs for more detailed error messages that might provide additional context about the connection failure.

If these steps do not resolve the issue, consider testing the connection with a simple script to isolate the problem further.

To continue talking to Dosu, mention @dosu.

202030481266 commented 2 weeks ago

The example provided by llama-index docs also caused error, I decided to abandom this solution.