`# Step 1: Import necessary libraries and modules
import dspy
from dspy.teleprompt import BootstrapFewShot
from dspy.evaluate.evaluate import Evaluate
from sklearn.model_selection import train_test_split
import pandas as pd
import random
import weave
import weaviate
from dspy.retrieve.weaviate_rm import WeaviateRM
from dspy.primitives.example import Example
Import Ragas metric for factual correctness
from ragas.llms import LangchainLLMWrapper
from ragas.dataset_schema import SingleTurnSample
from ragas.metrics._factual_correctness import FactualCorrectness
from langchain_openai import ChatOpenAI
import asyncio
import nest_asyncio
Apply nest_asyncio to avoid event loop conflicts in Weave
client = weaviate.connect_to_wcs(
cluster_url="", # Replace with your WCS URL
auth_credentials=weaviate.auth.AuthApiKey(""), # Replace with your WCS key
headers={
'X-Cohere-Api-Key': "" # Replace with your Cohere API key
}
)
retriever_model = WeaviateRM(
weaviate_collection_name="WeaviateBlogChunk", # Use 'class_name' instead of 'collection_name'
weaviate_client=client,
k=5 # Number of top results to retrieve
)
def formatdataset(df):
"""Format the dataset into DSPy-compatible examples with initialized inputs."""
examples = []
for , row in df.iterrows():
example = Example({
'question': row['user_input'],
'answer': row['reference']
}).with_inputs('question') # Input key as 'question'
examples.append(example)
return examples
Split the dataset into training and evaluation sets
`# Step 1: Import necessary libraries and modules import dspy from dspy.teleprompt import BootstrapFewShot from dspy.evaluate.evaluate import Evaluate from sklearn.model_selection import train_test_split import pandas as pd import random import weave import weaviate from dspy.retrieve.weaviate_rm import WeaviateRM from dspy.primitives.example import Example
Import Ragas metric for factual correctness
from ragas.llms import LangchainLLMWrapper from ragas.dataset_schema import SingleTurnSample from ragas.metrics._factual_correctness import FactualCorrectness from langchain_openai import ChatOpenAI import asyncio import nest_asyncio
Apply nest_asyncio to avoid event loop conflicts in Weave
nest_asyncio.apply()
weave.init(project_name="dspy") SEED = 42 random.seed(SEED)
dataset_path = './generated_testset.csv'
Step 2: Configure the Language Model (LM) and Retrieval Model (RM)
llm = dspy.OpenAI(model='gpt-4o-mini', api_key="sk-")
client = weaviate.connect_to_wcs( cluster_url="", # Replace with your WCS URL auth_credentials=weaviate.auth.AuthApiKey(""), # Replace with your WCS key headers={ 'X-Cohere-Api-Key': "" # Replace with your Cohere API key } )
retriever_model = WeaviateRM( weaviate_collection_name="WeaviateBlogChunk", # Use 'class_name' instead of 'collection_name' weaviate_client=client, k=5 # Number of top results to retrieve )
dspy.settings.configure(lm=llm, rm=retriever_model)
Initialize the evaluator LLM using Langchain and OpenAI GPT-4 model
evaluator_llm = LangchainLLMWrapper( ChatOpenAI(model="gpt-4o-mini", openai_api_key="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") )
Step 3: Load and Format Your Custom Dataset
df = pd.read_csv("/Users/brettyoung/Desktop/dev_24/tutorials/ds_py/v2/generated_testset.csv")
def formatdataset(df): """Format the dataset into DSPy-compatible examples with initialized inputs.""" examples = [] for , row in df.iterrows(): example = Example({ 'question': row['user_input'], 'answer': row['reference'] }).with_inputs('question') # Input key as 'question' examples.append(example) return examples
Split the dataset into training and evaluation sets
train_df, eval_df = train_test_split(df, train_size=20, test_size=10, random_state=SEED)
trainset = format_dataset(train_df) devset = format_dataset(eval_df)
print(f"Trainset Size: {len(trainset)}, Devset Size: {len(devset)}") print(f"First Trainset Example: {trainset[0]}")
Step 4: Define the Signatures for the RAG Pipeline
class GenerateAnswer(dspy.Signature): """Answer questions with 1-3 sentence answers.""" context = dspy.InputField(desc="may contain relevant facts") question = dspy.InputField() answer = dspy.OutputField(desc="Answer in 1-3 sentences")
Step 5: Build the RAG Pipeline
class RAG(dspy.Module): def init(self, num_passages=5): super().init() self.retrieve = dspy.Retrieve(k=num_passages) self.generate_answer = dspy.ChainOfThought(GenerateAnswer)
Step 6: Define the Factual Correctness Metric
def factual_correctness_metric(example, pred, trace=None): """Use Ragas factual correctness metric."""
Step 7: Compile the RAG Program with the Custom Metric and Teleprompter
teleprompter = BootstrapFewShot( metric=factual_correctness_metric, # Use the factual correctness metric metric_threshold=0.7 # Accept scores >= 0.5 )
compiled_rag = teleprompter.compile(RAG(), trainset=trainset)
Optional: Save the compiled RAG program
save_path = './compiled_rag_program_v1.json' compiled_rag.save(save_path)
print(f"RAG Program compiled and saved to {save_path}.")
` ----- Is there a better way to use async metrics? This is freezing randomly at different