Closed SinghCoder closed 1 month ago
To address the issue where callback managers are not set correctly in post processors, leading to missing traces for re-ranking steps, you should implement a consistent pattern across all postprocessors, similar to what's done in the SentenceTransformerRerank
, FlagEmbeddingReranker
, and CohereRerank
classes. This involves wrapping the re-ranking logic within a context manager that handles callback events and utilizing a dispatcher for event handling.
Here's a concise example to guide you through adapting your postprocessors:
from llama_index.core.callbacks import CBEventType, EventPayload
from llama_index.core.instrumentation import get_dispatcher
from llama_index.core.instrumentation.events.rerank import ReRankEndEvent, ReRankStartEvent
from llama_index.core.postprocessor.types import BaseNodePostprocessor
from llama_index.core.schema import NodeWithScore, QueryBundle
dispatcher = get_dispatcher(__name__)
class YourCustomReranker(BaseNodePostprocessor):
# Initialization and class setup
def _postprocess_nodes(self, nodes, query_bundle=None):
dispatch_event = dispatcher.get_dispatch_event()
dispatch_event(ReRankStartEvent(query=query_bundle, nodes=nodes, top_n=self.top_n, model_name=self.model))
with self.callback_manager.event(CBEventType.RERANKING, payload={
EventPayload.NODES: nodes,
EventPayload.MODEL_NAME: self.model,
EventPayload.QUERY_STR: query_bundle.query_str,
EventPayload.TOP_K: self.top_n,
}) as event:
# Re-ranking logic here
event.on_end(payload={EventPayload.NODES: new_nodes})
dispatch_event(ReRankEndEvent(nodes=new_nodes))
return new_nodes
To ensure comprehensive visibility and traceability of the postprocessing steps in your project:
with self.callback_manager.event(...)
context manager and dispatcher events (ReRankStartEvent
and ReRankEndEvent
).payload
for the event method and parameters for dispatcher events to include relevant information.event.on_end(payload={...})
and the dispatcher to signal the completion of the re-ranking process.This approach will ensure that all postprocessors, including those not initially designed for SBERT re-ranking, execute under the scope of configured callback managers and the dispatcher, allowing for the inclusion of re-ranking steps in the trace.
To continue the conversation, mention @dosu.
My proposal basically was to change
def _run_component(self, **kwargs: Any) -> Any:
"""Run component."""
output = self.postprocessor.postprocess_nodes(
kwargs["nodes"], query_str=kwargs.get("query_str", None)
)
return {"nodes": output}
inside core/postprocessors/types.py:PostprocessorComponent
to
def _run_component(self, **kwargs: Any) -> Any:
"""Run component."""
from llama_index.core.callbacks import CBEventType, EventPayload
with self.postprocessor.callback_manager.event(
CBEventType.RERANKING,
payload={
EventPayload.NODES: kwargs["nodes"],
EventPayload.MODEL_NAME: kwargs.get("model", None),
EventPayload.QUERY_STR: kwargs.get("query_str", None),
EventPayload.TOP_K: kwargs.get("top_n", kwargs.get("top_k", None)),
},
) as event:
output = self.postprocessor.postprocess_nodes(
kwargs["nodes"], query_str=kwargs.get("query_str", None)
)
event.on_end(payload={EventPayload.NODES: output})
return {"nodes": output}
but, when trying out I realized that the model
argument and top_k
thing might be very specific to each integration, i.e. different names or something?
The one I provided is a "hacky" way.. Love to hear your thoughts on if there is something better way to do this.
To effectively ensure that all node post processors run under the scope of configured callback managers while accommodating integration-specific arguments like model
and top_k
, you can refine the _run_component
method in PostprocessorComponent
. Here's an adapted approach that maintains flexibility for handling additional, integration-specific arguments:
def _run_component(self, **kwargs: Any) -> Any:
"""Run component with callback manager scope and support for integration-specific arguments."""
with self.postprocessor.callback_manager:
# Pass all kwargs to postprocess_nodes, including 'model' and 'top_k'
output = self.postprocessor.postprocess_nodes(
kwargs["nodes"], query_str=kwargs.get("query_str", None), **kwargs
)
return {"nodes": output}
This modification ensures that the execution of postprocess_nodes
is scoped within the callback manager, crucial for capturing traces effectively during re-ranking steps. By passing **kwargs
to postprocess_nodes
, the method remains flexible to handle additional arguments without direct modifications to its interface. Ensure that your postprocess_nodes
method in BaseNodePostprocessor
implementations is prepared to handle these extra arguments. This approach provides a structured way to include all necessary operations within the callback manager's scope, addressing the issue of missing traces for re-ranking steps.
To continue the conversation, mention @dosu.
Bug Description
It seems like most of the node post processors, when run, do not run under scope of configured callback managers, because of which traces miss the re-ranking steps..
Version
llama-index==0.10.34
Steps to Reproduce
Try this simple example:
Expectation is to get all these elements in a trace logged in
langfuse
(what I am using as observability platform).But, the trace received is attached below:
On looking into the code, in the postprocessors, we do initialize the postprocessor in the component, but only in Sbert rerank is where we use it as:
After updating my local branch, I realized, we do do this in new re-rankers added as external integrations like CohereReranker, but not in native core re-rankers except sbert..
This definitely seems like a bug..
Should this be handled at base level - inside
PostprocessorComponent._run_component()
?If we do it in base component, then not everyone would have to do copy same code in their integrations I guess..
The expected trace though is:
which comes when cohere reranker is used.
Relevant Logs/Tracbacks
No response