Closed DiyarD closed 3 weeks ago
Yes, thank you @DiyarD ! This is my job for tomorrow :D
@okhat Omar, if u have something to test, I am happy to test it with Claude Haiku, Now I use Sonet, because of the issue just @DiyarD outlined.... :D Haiku is not smart enough or I do not know... Darvi
@fireking77 does Claude Haiku support some kind of response_format
like OpenAI structured outputs? Is it in LiteLLM with that support?
I cannot answer your question specifily, but tomorrow, I am going to research it deeper, and i will riport back. Unfortunatly I am not familiar with LiteLLM, but that is another thing I am going to check :) tomorrow.
now I have found this: https://docs.anthropic.com/en/docs/test-and-evaluate/strengthen-guardrails/increase-consistency to force the structured output with anthropic models.
so sorry, I need to learn a lot.
Darvi
Structured Output is extremely important, and json_repair (https://github.com/mangiucugna/json_repair) might help alleviate this issue.
@fireking77, does Claude Haiku support a response_format similar to OpenAI's structured outputs? And is this functionality available in LiteLLM?
I believe it can support it. Based on this documentation: https://docs.aws.amazon.com/bedrock/latest/userguide/conversation-inference-supported-models-features.html, Claude 3 Haiku is capable of tool use, so it should be able to handle structured outputs.
However, it's not the same as LiteLLM (I checked the documentation). The specific implementation can be found here: https://docs.aws.amazon.com/bedrock/latest/APIReference/API_runtime_ToolConfiguration.html#bedrock-Type-runtime_ToolConfiguration-tools. I have made a CustomLM implementation so I think I could enforce it, but reflection is a big question here for me how does it work in DSPy under the hood, and how can I access the info in my Inhereted implementation. Specifically, how it generates the JSON definition for the output from the Pydantic model and how can I access that 'text'.
In summary, I think Haiku can support structured output, and this can be enforced through the tool usage. But I think after the response, it will still need these JSON fixers like Oulines or other.
Darvi One more thing, I used function calling above, because, actualy populate a Pydantic model is a function call for me, but maybe I am wrong.
@okhat Omar,
@fireking77 does Claude Haiku support some kind of response_format like OpenAI structured outputs? Is it in LiteLLM with that support?
I made a deployment and ran a quick, rough test using LiteLLM's Instructor (https://docs.litellm.ai/docs/tutorials/instructor). My DSPy program was able to run with Haiku! I finally managed to learn something useful too.
So thanks for the heads up! Darvi
A really neat version of this is now in 2.5.17 --- enjoy, folks. It's really good in my experience.
No changes needed to use it. Just upgrade :-D
Wow. Very impressive, thank you :)
It even works with ollama which doesn't support openai's response_format
.
@okhat How do you implement it?
@okhat How do you implement it?
So with more usage i found out it still breaks when using ollama + smaller models (1b llama 3.2)
I changed chat_adapter.py
as follows to somewhat improve it:
I modified field_header_pattern
, parse()
, and parse_value()
import re
import ast
import json
import enum
import inspect
import pydantic
import textwrap
from pydantic import TypeAdapter
from collections.abc import Mapping
from pydantic.fields import FieldInfo
from typing import Any, Dict, KeysView, List, Literal, NamedTuple, get_args, get_origin
from dspy.adapters.base import Adapter
from ..signatures.field import OutputField
from ..signatures.signature import SignatureMeta
from ..signatures.utils import get_dspy_field_type
field_header_pattern = r"\[\[?\s*##\s*(.*?)\s*##\s*\]?\](.*?)((?=\[\[?\s*##)|$)"
class FieldInfoWithName(NamedTuple):
name: str
info: FieldInfo
# Built-in field indicating that a chat turn has been completed.
BuiltInCompletedOutputFieldInfo = FieldInfoWithName(name="completed", info=OutputField())
class ChatAdapter(Adapter):
def __init__(self):
pass
def format(self, signature, demos, inputs):
messages = []
# Extract demos where some of the output_fields are not filled in.
incomplete_demos = [demo for demo in demos if not all(k in demo for k in signature.fields)]
complete_demos = [demo for demo in demos if demo not in incomplete_demos]
incomplete_demos = [
demo
for demo in incomplete_demos
if any(k in demo for k in signature.input_fields) and any(k in demo for k in signature.output_fields)
]
demos = incomplete_demos + complete_demos
messages.append({"role": "system", "content": prepare_instructions(signature)})
for demo in demos:
messages.append(format_turn(signature, demo, role="user", incomplete=demo in incomplete_demos))
messages.append(format_turn(signature, demo, role="assistant", incomplete=demo in incomplete_demos))
messages.append(format_turn(signature, inputs, role="user"))
return messages
def parse(self, signature, completion, _parse_values=True):
sections = [(None, [])]
# print(completion)
# print('#$#$#$')
# exit()
# for line in completion.splitlines():
# match = field_header_pattern.match(line.strip())
# if match:
# sections.append((match.group(1), []))
# else:
# sections[-1][1].append(line)
matches = re.findall(field_header_pattern, completion, re.DOTALL)
sections = [ (s[0].strip(), s[1].strip()) for s in matches]
# sections = [(k, "\n".join(v).strip()) for k, v in sections]
fields = {}
for k, v in sections:
if (k not in fields) and (k in signature.output_fields):
try:
fields[k] = parse_value(k, v, signature.output_fields[k].annotation) if _parse_values else v
except Exception as e:
raise ValueError(
f"Error parsing field {k}: {e}.\n\n\t\tOn attempting to parse the value\n```\n{v}\n```"
)
if fields.keys() != signature.output_fields.keys():
# print(sections)
# print(completion)
# print("##$$")
# exit()
raise ValueError(f"Expected {signature.output_fields.keys()} but got {fields.keys()}")
return fields
def format_turn(self, signature, values, role, incomplete=False):
return format_turn(signature, values, role, incomplete)
def format_blob(blob):
if "\n" not in blob and "«" not in blob and "»" not in blob:
return f"«{blob}»"
modified_blob = blob.replace("\n", "\n ")
return f"«««\n {modified_blob}\n»»»"
def format_input_list_field_value(value: List[Any]) -> str:
"""
Formats the value of an input field of type List[Any].
Args:
value: The value of the list-type input field.
Returns:
A string representation of the input field's list value.
"""
if len(value) == 0:
return "N/A"
if len(value) == 1:
return format_blob(value[0])
return "\n".join([f"[{idx+1}] {format_blob(txt)}" for idx, txt in enumerate(value)])
def _serialize_for_json(value):
if isinstance(value, pydantic.BaseModel):
return value.model_dump()
elif isinstance(value, list):
return [_serialize_for_json(item) for item in value]
elif isinstance(value, dict):
return {key: _serialize_for_json(val) for key, val in value.items()}
else:
return value
def _format_field_value(field_info: FieldInfo, value: Any) -> str:
"""
Formats the value of the specified field according to the field's DSPy type (input or output),
annotation (e.g. str, int, etc.), and the type of the value itself.
Args:
field_info: Information about the field, including its DSPy field type and annotation.
value: The value of the field.
Returns:
The formatted value of the field, represented as a string.
"""
if isinstance(value, list) and field_info.annotation is str:
# If the field has no special type requirements, format it as a nice numbere list for the LM.
return format_input_list_field_value(value)
elif isinstance(value, pydantic.BaseModel) or isinstance(value, dict) or isinstance(value, list):
return json.dumps(_serialize_for_json(value))
else:
return str(value)
def format_fields(fields_with_values: Dict[FieldInfoWithName, Any]) -> str:
"""
Formats the values of the specified fields according to the field's DSPy type (input or output),
annotation (e.g. str, int, etc.), and the type of the value itself. Joins the formatted values
into a single string, which is is a multiline string if there are multiple fields.
Args:
fields_with_values: A dictionary mapping information about a field to its corresponding
value.
Returns:
The joined formatted values of the fields, represented as a string.
"""
output = []
for field, field_value in fields_with_values.items():
formatted_field_value = _format_field_value(field_info=field.info, value=field_value)
output.append(f"[[ ## {field.name} ## ]]\n{formatted_field_value}")
return "\n\n".join(output).strip()
def parse_value(k, value, annotation):
if annotation is str:
return str(value)
parsed_value = value
if isinstance(annotation, enum.EnumMeta):
parsed_value = annotation[value]
elif isinstance(value, str):
value = value.strip(" \t\n\r\f\v`")
try:
parsed_value = json.loads(value)
if type(parsed_value) == dict and len(parsed_value.keys()) == 1 and k in parsed_value:
parsed_value = parsed_value[k]
except json.JSONDecodeError:
try:
parsed_value = ast.literal_eval(value)
except (ValueError, SyntaxError):
try:
list_pattern = r'^([-•●◉⦿⦾◦◘*+=◙»‣⁃⁌⁍∙○])\s*(.*?)(?=\n|$)'
matches = re.findall(list_pattern, value, re.MULTILINE)
list_item_char = set([m[0].strip() for m in matches])
list_items = [m[1].strip() for m in matches]
if len(list_item_char) != 1:
raise Exception("Probably not a list")
parsed_value = list_items
except Exception:
parsed_value = value
return TypeAdapter(annotation).validate_python(parsed_value)
def format_turn(signature: SignatureMeta, values: Dict[str, Any], role, incomplete=False) -> Dict[str, str]:
"""
Constructs a new message ("turn") to append to a chat thread. The message is carefully formatted
so that it can instruct an LLM to generate responses conforming to the specified DSPy signature.
Args:
signature: The DSPy signature to which future LLM responses should conform.
values: A dictionary mapping field names (from the DSPy signature) to corresponding values
that should be included in the message.
role: The role of the message, which can be either "user" or "assistant".
incomplete: If True, indicates that output field values are present in the set of specified
``values``. If False, indicates that ``values`` only contains input field values.
Returns:
A chat message that can be appended to a chat thread. The message contains two string fields:
``role`` ("user" or "assistant") and ``content`` (the message text).
"""
content = []
if role == "user":
fields: Dict[str, FieldInfo] = signature.input_fields
if incomplete:
content.append("This is an example of the task, though some input or output fields are not supplied.")
else:
fields: Dict[str, FieldInfo] = signature.output_fields
# Add the built-in field indicating that the chat turn has been completed
fields[BuiltInCompletedOutputFieldInfo.name] = BuiltInCompletedOutputFieldInfo.info
values = {**values, BuiltInCompletedOutputFieldInfo.name: ""}
if not incomplete:
field_names: KeysView = fields.keys()
if not set(values).issuperset(set(field_names)):
raise ValueError(f"Expected {field_names} but got {values.keys()}")
formatted_fields = format_fields(
fields_with_values={
FieldInfoWithName(name=field_name, info=field_info): values.get(
field_name, "Not supplied for this particular example."
)
for field_name, field_info in fields.items()
}
)
content.append(formatted_fields)
if role == "user":
# def type_info(v):
# return f" (must be formatted as a valid Python {get_annotation_name(v.annotation)})" \
# if v.annotation is not str else ""
#
# content.append(
# "Respond with the corresponding output fields, starting with the field "
# + ", then ".join(f"`[[ ## {f} ## ]]`{type_info(v)}" for f, v in signature.output_fields.items())
# + ", and then ending with the marker for `[[ ## completed ## ]]`."
# )
content.append(
"Respond with the corresponding output fields, starting with the field "
+ ", then ".join(f"`{f}`" for f in signature.output_fields)
+ ", and then ending with the marker for `completed`."
)
return {"role": role, "content": "\n\n".join(content).strip()}
def get_annotation_name(annotation):
origin = get_origin(annotation)
args = get_args(annotation)
if origin is None:
if hasattr(annotation, "__name__"):
return annotation.__name__
else:
return str(annotation)
else:
args_str = ", ".join(get_annotation_name(arg) for arg in args)
return f"{get_annotation_name(origin)}[{args_str}]"
def enumerate_fields(fields):
parts = []
for idx, (k, v) in enumerate(fields.items()):
parts.append(f"{idx+1}. `{k}`")
parts[-1] += f" ({get_annotation_name(v.annotation)})"
parts[-1] += f": {v.json_schema_extra['desc']}" if v.json_schema_extra["desc"] != f"${{{k}}}" else ""
return "\n".join(parts).strip()
def move_type_to_front(d):
# Move the 'type' key to the front of the dictionary, recursively, for LLM readability/adherence.
if isinstance(d, Mapping):
return {k: move_type_to_front(v) for k, v in sorted(d.items(), key=lambda item: (item[0] != 'type', item[0]))}
elif isinstance(d, list):
return [move_type_to_front(item) for item in d]
return d
def prepare_schema(type_):
schema = pydantic.TypeAdapter(type_).json_schema()
schema = move_type_to_front(schema)
return schema
def prepare_instructions(signature: SignatureMeta):
parts = []
parts.append("Your input fields are:\n" + enumerate_fields(signature.input_fields))
parts.append("Your output fields are:\n" + enumerate_fields(signature.output_fields))
parts.append("All interactions will be structured in the following way, with the appropriate values filled in.")
def field_metadata(field_name, field_info):
type_ = field_info.annotation
if get_dspy_field_type(field_info) == 'input' or type_ is str:
desc = ""
elif type_ is bool:
desc = "must be True or False"
elif type_ in (int, float):
desc = f"must be a single {type_.__name__} value"
elif inspect.isclass(type_) and issubclass(type_, enum.Enum):
desc= f"must be one of: {'; '.join(type_.__members__)}"
elif hasattr(type_, '__origin__') and type_.__origin__ is Literal:
desc = f"must be one of: {'; '.join([str(x) for x in type_.__args__])}"
else:
desc = "must be pareseable according to the following JSON schema: "
desc += json.dumps(prepare_schema(type_))
desc = (" " * 8) + f"# note: the value you produce {desc}" if desc else ""
return f"{{{field_name}}}{desc}"
def format_signature_fields_for_instructions(fields: Dict[str, FieldInfo]):
return format_fields(
fields_with_values={
FieldInfoWithName(name=field_name, info=field_info): field_metadata(field_name, field_info)
for field_name, field_info in fields.items()
}
)
parts.append(format_signature_fields_for_instructions(signature.input_fields))
parts.append(format_signature_fields_for_instructions(signature.output_fields))
parts.append(format_fields({BuiltInCompletedOutputFieldInfo: ""}))
instructions = textwrap.dedent(signature.instructions)
objective = ("\n" + " " * 8).join([""] + instructions.splitlines())
parts.append(f"In adhering to this structure, your objective is: {objective}")
# parts.append("You will receive some input fields in each interaction. " +
# "Respond only with the corresponding output fields, starting with the field " +
# ", then ".join(f"`{f}`" for f in signature.output_fields) +
# ", and then ending with the marker for `completed`.")
return "\n\n".join(parts).strip()
This is very hacky and unreliable.
If using ollama I also had to make this additional change to litellm/llm/ollama.py
Replace get_ollama_response
function with the following:
# ollama implementation
def get_ollama_response(
model_response: litellm.ModelResponse,
model: str,
prompt: str,
optional_params: dict,
logging_obj: Any,
encoding: Any,
acompletion: bool = False,
api_base="http://localhost:11434",
):
if api_base.endswith("/api/generate"):
url = api_base
else:
url = f"{api_base}/api/generate"
## Load Config
config = litellm.OllamaConfig.get_config()
for k, v in config.items():
if (
k not in optional_params
): # completion(top_k=3) > cohere_config(top_k=3) <- allows for dynamic variables to be passed in
optional_params[k] = v
stream = optional_params.pop("stream", False)
format = optional_params.pop("format", None)
images = optional_params.pop("images", None)
data = {
"model": model,
"prompt": prompt,
"options": optional_params,
"stream": stream,
}
if format is not None:
data["format"] = format
if images is not None:
data["images"] = [_convert_image(image) for image in images]
## LOGGING
logging_obj.pre_call(
input=None,
api_key=None,
additional_args={
"api_base": url,
"complete_input_dict": data,
"headers": {},
"acompletion": acompletion,
},
)
if acompletion is True:
if stream is True:
response = ollama_async_streaming(
url=url,
data=data,
model_response=model_response,
encoding=encoding,
logging_obj=logging_obj,
)
else:
response = ollama_acompletion(
url=url,
data=data,
model_response=model_response,
encoding=encoding,
logging_obj=logging_obj,
)
return response
elif stream is True:
return ollama_completion_stream(url=url, data=data, logging_obj=logging_obj)
response = requests.post(
url=f"{url}", json={**data, "stream": stream}, timeout=litellm.request_timeout
)
if response.status_code != 200:
raise OllamaError(status_code=response.status_code, message=response.text)
## LOGGING
logging_obj.post_call(
input=prompt,
api_key="",
original_response=response.text,
additional_args={
"headers": None,
"api_base": api_base,
},
)
response_json = response.json()
## RESPONSE OBJECT
model_response.choices[0].finish_reason = "stop"
if data.get("format", "") == "json":
function_call = json.loads(response_json["response"])
try:
message = litellm.Message(
content=None,
tool_calls=[
{
"id": f"call_{str(uuid.uuid4())}",
"function": {
"name": function_call["name"],
"arguments": json.dumps(function_call["arguments"]),
},
"type": "function",
}
],
)
except KeyError as e:
message = litellm.Message(
content=None,
tool_calls=[
{
"id": f"call_{str(uuid.uuid4())}",
"function": {
"name": k,
"arguments": json.dumps(v),
},
"type": "function",
}
for k,v in function_call.items()
],
)
model_response.choices[0].message = message # type: ignore
model_response.choices[0].finish_reason = "tool_calls"
else:
model_response.choices[0].message.content = response_json["response"] # type: ignore
model_response.created = int(time.time())
model_response.model = "ollama/" + model
prompt_tokens = response_json.get("prompt_eval_count", len(encoding.encode(prompt, disallowed_special=()))) # type: ignore
completion_tokens = response_json.get(
"eval_count", len(response_json.get("message", dict()).get("content", ""))
)
setattr(
model_response,
"usage",
litellm.Usage(
prompt_tokens=prompt_tokens,
completion_tokens=completion_tokens,
total_tokens=prompt_tokens + completion_tokens,
),
)
return model_response
A really neat version of this is now in 2.5.17 --- enjoy, folks. It's really good in my experience.
No changes needed to use it. Just upgrade :-D
Can you share some example how to use a typed output field in custom signature class with normal dspy.Predict now?
Essentially, how do we use this: fact_checking = dspy.ChainOfThought('claims -> verdicts: list[bool]') with a Signature and Module class? and specify retries like we did for TypedPredictor?
Typed prediction is very unreliable with smaller models. Outlines is a promising solution, and it would be great if it were integrated into DSPy.