tensorlakeai / indexify

A realtime serving engine for Data-Intensive Generative AI Applications
https://docs.tensorlake.ai
Apache License 2.0
893 stars 109 forks source link

Serialization issue with deploying compute_graph with fastapi #924

Open sadath-12 opened 2 weeks ago

sadath-12 commented 2 weeks ago

I am trying to deploy compute_graph via python api server , in this case its fastapi

Here is my fastapi code in main.py


from fastapi import FastAPI
# from workflows import pdf # Import the users module from endpoints

app = FastAPI()

# Include the users router with a prefix
# app.include_router(pdf.router, prefix="/pdf", tags=["pdfs"])

@app.get("/")
async def root():
    return {"message": "Indexify API"}

from pydantic import BaseModel
from indexify import indexify_function, Graph
from typing import List

class Total(BaseModel):
    val: int = 0

@indexify_function()
def generate_numbers(a: int) -> List[int]:
    return [i for i in range(a)]

@indexify_function()
def square(x: int) -> int:
    return x ** 2

@indexify_function(accumulate=Total)
def add(total: Total, new: int) -> Total:
    total.val += new
    return total

g = Graph(name="summer", start_node=generate_numbers, description="Simple Sequence Summer")
g.add_edge(generate_numbers, square)
g.add_edge(square, add)

@app.post("/deploy")
async def root():
    from indexify import RemoteGraph
    RemoteGraph.deploy(g,server_url="http://localhost:8900")
    return {"message": "created graph succesfully"}

The deployment happens successfully and I can see the graph in the UI

but when I invoke the graph via code

if __name__ == "__main__":
    from indexify import RemoteGraph
    #graph = RemoteGraph.deploy(g)
    graph = RemoteGraph.by_name(name="summer",server_url="http://localhost:8900")
    invocation_id = graph.run(block_until_done=True, a=10)
    result = graph.output(invocation_id, "add")
    print(result)

I get error

failed to fetch logs: diagnostic payload not found
stderr: 
 Traceback (most recent call last):
  File 
"/usr/local/lib/python3.10/site-packages/indexify/executor/function_worker.py", 
line 139, in _run_function
    _load_function(namespace, graph_name, fn_name, code_path, version)
  File 
"/usr/local/lib/python3.10/site-packages/indexify/executor/function_worker.py", 
line 54, in _load_function
    cloudpickle.loads(pickled_functions)
ModuleNotFoundError: No module named 'main'

TaskCompleted: invocation_id='7df7b1d76622686f' fn_name='add' task_id='aa802154-a6b7-4124-bbd4-f6fbd99effd9' 
executor_id=None outcome='Failure'
Traceback (most recent call last):
  File "/home/ubuntu/indexify/examples/readme/readme.py", line 6, in <module>
    result = graph.output(invocation_id, "add")
  File "/root/anaconda3/envs/python10/lib/python3.10/site-packages/indexify/remote_graph.py", line 80, in output
    return self._client.graph_outputs(
  File "/root/anaconda3/envs/python10/lib/python3.10/site-packages/indexify/http_client.py", line 323, in graph_outputs
    self._fns[fn_key] = self.load_fn_wrapper(graph, fn_name)
  File "/root/anaconda3/envs/python10/lib/python3.10/site-packages/indexify/http_client.py", line 190, in load_fn_wrapper
    cloudpickle.loads(pickled_functions_by_name[fn_name])
ModuleNotFoundError: No module named 'main'

How to tell indexify main is a module for my python api and not a pip package and since its not used it should ignore it ?

diptanu commented 2 weeks ago

@sadath-12 At the moment we are not pickling the module where a function decorated with @indexify_function lives. So the file is not getting pickled but just the function. In this example, Total doesn't pickled but only the functions. The generate_numbers, squared, etc, works because they are not trying to access any classes defined in the module.

If you extended IndexfiyFunction for add it would have worked, since in that case the class doesn't get wrapped by anything, and we would pickle it correctly.

Here is the workaround -

@app.post("/deploy")
async def root():
    from indexify import RemoteGraph
    import sys
    RemoteGraph.deploy(g,server_url="http://localhost:8900", additional_modules=[sys.modules[__name__]])
    return {"message": "created graph succesfully"}

I have tested and this works. Going to keep this issue open so that we can figure out a way to pickle the module where the functions live.

sadath-12 commented 2 weeks ago

thanks @diptanu works for readme but for not pdf example I have

API code I use to deploy compute graph

from fastapi import FastAPI
#from workflows import pdf # Import the users module from endpoints
#import sys
app = FastAPI()

# Include the users router with a prefix
#app.include_router(pdf.router, prefix="/pdf", tags=["pdfs"])

# uvicorn main:app --host 0.0.0.0 --port 8000 --reload

from indexify.functions_sdk.image import Image
import psycopg2
from psycopg2.extras import execute_values
from pydantic import BaseModel
from psycopg2.extras import Json
from typing import List, Optional, Dict
from inkwell import Page, Document as InkwellDocument
from indexify.functions_sdk.data_objects import File
from indexify.functions_sdk.graph import Graph
from indexify.functions_sdk.indexify_functions import IndexifyFunction, indexify_function
from pgvector.psycopg2 import register_vector
from sentence_transformers import SentenceTransformer

image1 = (
    Image()
    .name("tensorlake/indexify-executor-default")
    .base_image("ubuntu:22.04")
    .run("apt update")
    .run("apt install -y libgl1-mesa-glx git g++")
    .run("pip install torch")
    .run("pip install numpy")
    .run("pip install langchain")
    .run("pip install langchain_text_splitters")
    .run("pip install git+https://github.com/facebookresearch/detectron2.git@v0.6")
    .run("apt install -y tesseract-ocr")
    .run("apt install -y libtesseract-dev")
    .run("pip install py-inkwell")
)

class DeployInput(BaseModel):
    namespace: str
    indexifyUrl: str

class Document(BaseModel):
    pages: List[Page]

class TextChunk(BaseModel):
    chunk: str
    page_number: Optional[int] = None
    embeddings: Optional[List[float]] = None

class PdfOutput(BaseModel):
    file: File
    source: str
    dbSchema: str  
    labels: Dict[str, str]  
    size: str  
    mime_type: str  
    textchunk: List[TextChunk]
    doc: Document

@indexify_function(image=image1)
def upload_pdf(pdf: PdfOutput) -> PdfOutput:
    return pdf

class PDFParser(IndexifyFunction):
    name = "pdf-parse"
    image = image1
    description = "Parser class that captures a PDF file"

    def __init__(self):
        super().__init__()
        from inkwell import Pipeline
        self._pipeline = Pipeline()

    def run(self, input: PdfOutput) -> PdfOutput:
        import tempfile
        with tempfile.NamedTemporaryFile(mode="wb", suffix=".pdf") as f:
            f.write(input.file.data)
            document: InkwellDocument = self._pipeline.process(f.name)
        input.doc = Document(pages=document.pages)
        return input

class TextEmbeddingExtractor(IndexifyFunction):
    name = "text-embedding-extractor"
    description = "Extractor class that captures an embedding model"
    system_dependencies = []
    input_mime_types = ["text"]
    image=image1

    def __init__(self):
        super().__init__()
        self.model = None

    def run(self,  pdf: PdfOutput) -> PdfOutput:
        if self.model is None:
            self.model = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2")
        for chunk in pdf.textchunk:
            embeddings = self.model.encode(chunk.chunk)
            chunk.embeddings = embeddings.tolist()
        return  pdf

@indexify_function(image=image1)
def extract_chunks(pdf: PdfOutput) -> PdfOutput:
    from inkwell import PageFragmentType
    from langchain_text_splitters import RecursiveCharacterTextSplitter

    text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200)
    chunks: List[TextChunk] = []
    document = pdf.doc

    print("length of pages",len(document.pages))

    for page in document.pages:
        page_text = ""
        for fragment in page.page_fragments:
            if fragment.fragment_type in [PageFragmentType.TABLE, PageFragmentType.FIGURE]:
                chunks.append(TextChunk(chunk=fragment.content.text, page_number=page.page_number))
            elif fragment.fragment_type == PageFragmentType.TEXT:
                page_text += fragment.content.text

        texts = text_splitter.split_text(page_text)
        for text in texts:
            chunk = TextChunk(chunk=text, page_number=page.page_number)
            chunks.append(chunk)
    print("list of chunks",len(chunks)) 
    pdf.textchunk = chunks   
    return pdf

class PGVectorWriter(IndexifyFunction):
    name = "pgvector_writer"

    def __init__(self):
        super().__init__()
        self.conn = psycopg2.connect(
            dbname="test",
            user="test",
            password="test",
            host="localhost",
            port="5432"
        )
        register_vector(self.conn)
        self._create_tables()

    def _create_tables(self) -> None:
        with self.conn.cursor() as cur:
            cur.execute("""
                CREATE TABLE IF NOT EXISTS text_embeddings (
                    id SERIAL PRIMARY KEY,
                    vector vector(384),
                    text TEXT,
                    page_number INT,
                    size TEXT,
                    source TEXT,
                    labels JSONB,
                    dbSchema TEXT,
                    mime_type TEXT
                );
            """)
            self.conn.commit()

    def run(self, pdf: PdfOutput) -> bool:
        print("Length of chunks is", len(pdf.textchunk))
        for chunk in pdf.textchunk:
            with self.conn.cursor() as cur:
                query = """
                    INSERT INTO text_embeddings (vector, text, page_number, size, source, labels, dbSchema, mime_type)
                    VALUES %s
                """
                values = [(chunk.embeddings, chunk.chunk, chunk.page_number, pdf.size, pdf.source, Json(pdf.labels), pdf.dbSchema, pdf.mime_type)]
                execute_values(cur, query, values)
                self.conn.commit()
        return True

from fastapi import APIRouter

#router = APIRouter()

from fastapi import FastAPI, UploadFile, File, APIRouter
from typing import Dict, List, Optional
from pydantic import BaseModel
import psycopg2
from psycopg2.extras import execute_values
from psycopg2.extras import Json
from inkwell import Page, Document as InkwellDocument
from indexify.functions_sdk.data_objects import File as IndexifyFile
from indexify.functions_sdk.graph import Graph
from indexify.functions_sdk.indexify_functions import IndexifyFunction, indexify_function
from pgvector.psycopg2 import register_vector
from sentence_transformers import SentenceTransformer

# Your existing model definitions...

class PdfInput(BaseModel):
    source: str
    dbSchema: str  
    labels: Dict[str, str]  
    pgHost: str
    indexifyUrl: str

#router = APIRouter()

@app.post("/deploy")
async def deploy_workflow(input: DeployInput):

    g = Graph(
        "pdf_graph-4",
        start_node=upload_pdf,
    )

    g.add_edge(upload_pdf, PDFParser)
    g.add_edge(PDFParser, extract_chunks)
    g.add_edge(extract_chunks, TextEmbeddingExtractor)
    g.add_edge(TextEmbeddingExtractor, PGVectorWriter)

    from indexify import RemoteGraph
    import sys
    RemoteGraph.deploy(g, server_url="http://localhost:8900",additional_modules=[sys.modules[__name__]])

    return "success"

Then I invoke it via

from indexify import RemoteGraph
import httpx
from indexify.functions_sdk.data_objects import File
from typing import Dict, List, Optional
from inkwell import Page, Document as InkwellDocument
from pydantic import BaseModel

class PdfInput(BaseModel):
    source: str
    dbSchema: str
    labels: Dict[str, str]
    pgHost: str
    indexifyUrl: str

class Document(BaseModel):
    pages: List[Page]

class TextChunk(BaseModel):
    chunk: str
    page_number: Optional[int] = None
    embeddings: Optional[List[float]] = None

class PdfOutput(BaseModel):
    file: File
    source: str
    dbSchema: str
    labels: Dict[str, str]
    size: str
    mime_type: str
    textchunk: List[TextChunk]
    doc: Document

if __name__ == "__main__":
    #graph = RemoteGraph.deploy(g,server_url="http://localhost:8900")

    graph = RemoteGraph.by_name(name="pdf_graph-4", server_url="http://localhost:8900")

    resp = httpx.get(url="https://arxiv.org/pdf/2106.00043.pdf", follow_redirects=True)
    resp.raise_for_status()
    file = File(data=resp.content, mime_type="application/pdf")

    pdf_output = PdfOutput(
        file=file,
        source="pdf_input.source",
        dbSchema="public",
        labels={
            "test":"test"
        },
        size="100",  # File size calculation
        mime_type="pdf",
        textchunk=[],
        doc=Document(pages=[])
    )

    invocation_id = graph.run(
            block_until_done=True,
        pdf=pdf_output
    )

    print("invocation is",invocation_id)

    print(graph.output(invocation_id, "extract_chunks"))

The errors originates from extract_chunks function

My indexify version

Version: 0.2.10
diptanu commented 1 week ago

@sadath-12 Please share some steps to reproduce. Don't have enough information about how to reproduce the error.