run-llama / llama_index

LlamaIndex is a data framework for your LLM applications
https://docs.llamaindex.ai
MIT License
35.38k stars 4.98k forks source link

[Bug]: LlamaIndexInstrumentor not working propely for BaseEmbeddings.get_text_embedding_batch #15591

Open P1et1e opened 3 weeks ago

P1et1e commented 3 weeks ago

Bug Description

I want to add observability for loading data into a VectorStoreIndex. Following the documentation for instrumentation and using Arize Phoenix for observability, only the last batch of generated Embeddings is traced. When I have a look at the attributes Tab all Chunks are shown in the input field. How to properly track all Embeddings that are created during Index creation ?

Version

0.10.65

Steps to Reproduce

Follow the instructions for setting up observability: https://docs.arize.com/phoenix/tracing/integrations-tracing/llamaindex Use SimpleDirectoryReader to load documents and create a VectorStoreIndex from documents.

Relevant Logs/Tracbacks

No response

dosubot[bot] commented 3 weeks ago

To properly track all embeddings created during index creation with LlamaIndexInstrumentor for BaseEmbeddings.get_text_embedding_batch when using Arize Phoenix for observability, ensure that the EmbeddingStartEvent and EmbeddingEndEvent are dispatched correctly, and the callback manager's events are properly handled. Here is the relevant code snippet that shows how this is done:

@dispatcher.span
def get_text_embedding_batch(
    self,
    texts: List[str],
    show_progress: bool = False,
    **kwargs: Any,
) -> List[Embedding]:
    """Get a list of text embeddings, with batching."""
    cur_batch: List[str] = []
    result_embeddings: List[Embedding] = []

    queue_with_progress = enumerate(
        get_tqdm_iterable(texts, show_progress, "Generating embeddings")
    )
    model_dict = self.to_dict()
    model_dict.pop("api_key", None)
    for idx, text in queue_with_progress:
        cur_batch.append(text)
        if idx == len(texts) - 1 or len(cur_batch) == self.embed_batch_size:
            # flush
            dispatcher.event(
                EmbeddingStartEvent(
                    model_dict=model_dict,
                )
            )
            with self.callback_manager.event(
                CBEventType.EMBEDDING,
                payload={EventPayload.SERIALIZED: self.to_dict()},
            ) as event:
                embeddings = self._get_text_embeddings(cur_batch)
                result_embeddings.extend(embeddings)
                event.on_end(
                    payload={
                        EventPayload.CHUNKS: cur_batch,
                        EventPayload.EMBEDDINGS: embeddings,
                    },
                )
            dispatcher.event(
                EmbeddingEndEvent(
                    chunks=cur_batch,
                    embeddings=embeddings,
                )
            )
            cur_batch = []

    return result_embeddings

This code ensures that for each batch of texts, EmbeddingStartEvent is dispatched before the embeddings are generated, and EmbeddingEndEvent is dispatched after the embeddings are generated. The callback manager's events are also properly handled to track the payloads [1].

To continue talking to Dosu, mention @dosu.

P1et1e commented 2 weeks ago

@dosu This does not solve the problem, it only shows the current implementation which seems not to be working.

dosubot[bot] commented 2 weeks ago

To address the issue where only the last batch of generated embeddings is traced, you should ensure that the EmbeddingStartEvent and EmbeddingEndEvent are correctly dispatched for each batch and that the callback manager is properly configured to handle these events.

Here are a few steps to troubleshoot and potentially resolve the issue:

  1. Verify Dispatcher and Callback Manager Initialization: Ensure that the dispatcher and callback_manager are correctly initialized and configured. The dispatcher should be set up at the module level, and the callback manager should be properly assigned to the LLM component.

  2. Check Event Dispatching: Confirm that the EmbeddingStartEvent and EmbeddingEndEvent are dispatched for each batch. The provided code snippet should handle this, but you might want to add logging to verify that these events are being dispatched as expected.

  3. Ensure Proper Event Handling: Make sure that the callback manager is correctly handling the events. The on_event_start and on_event_end methods should be called for each event, and the payloads should be properly logged.

Here is an enhanced version of the code snippet with additional logging to help debug the issue:

@dispatcher.span
def get_text_embedding_batch(
    self,
    texts: List[str],
    show_progress: bool = False,
    **kwargs: Any,
) -> List[Embedding]:
    """Get a list of text embeddings, with batching."""
    cur_batch: List[str] = []
    result_embeddings: List[Embedding] = []

    queue_with_progress = enumerate(
        get_tqdm_iterable(texts, show_progress, "Generating embeddings")
    )
    model_dict = self.to_dict()
    model_dict.pop("api_key", None)
    for idx, text in queue_with_progress:
        cur_batch.append(text)
        if idx == len(texts) - 1 or len(cur_batch) == self.embed_batch_size:
            # flush
            print(f"Dispatching EmbeddingStartEvent for batch {cur_batch}")
            dispatcher.event(
                EmbeddingStartEvent(
                    model_dict=model_dict,
                )
            )
            with self.callback_manager.event(
                CBEventType.EMBEDDING,
                payload={EventPayload.SERIALIZED: self.to_dict()},
            ) as event:
                embeddings = self._get_text_embeddings(cur_batch)
                result_embeddings.extend(embeddings)
                event.on_end(
                    payload={
                        EventPayload.CHUNKS: cur_batch,
                        EventPayload.EMBEDDINGS: embeddings,
                    },
                )
            print(f"Dispatching EmbeddingEndEvent for batch {cur_batch}")
            dispatcher.event(
                EmbeddingEndEvent(
                    chunks=cur_batch,
                    embeddings=embeddings,
                )
            )
            cur_batch = []

    return result_embeddings

This code adds print statements to log when EmbeddingStartEvent and EmbeddingEndEvent are dispatched. This can help you verify that the events are being dispatched for each batch.

Additionally, ensure that the integration with Arize Phoenix is correctly set up and that the events are being captured and sent to the Phoenix server. If the issue persists, you might need to review the integration with Arize Phoenix to ensure that it is correctly configured to handle the events from LlamaIndex [1][2].

To continue talking to Dosu, mention @dosu.