Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
195 changes: 162 additions & 33 deletions openhands-sdk/openhands/sdk/llm/mixins/fn_call_converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import re
import sys
from collections.abc import Iterable
from typing import Literal, NotRequired, TypedDict, cast
from typing import Any, Literal, NotRequired, TypedDict, cast

from litellm import ChatCompletionToolParam, ChatCompletionToolParamFunctionChunk

Expand Down Expand Up @@ -41,6 +41,11 @@ class TextPart(TypedDict):
TASK_TRACKER_TOOL_NAME = "task_tracker"

# Inspired by: https://docs.together.ai/docs/llama-3-function-calling#function-calling-w-llama-31-70b
MISSING_DESCRIPTION_PLACEHOLDER = "No description provided"
SCHEMA_INDENT_STEP = 2
SCHEMA_UNION_KEYS = ("anyOf", "oneOf", "allOf")


system_message_suffix_TEMPLATE = """
You have access to the following functions:

Expand Down Expand Up @@ -487,6 +492,155 @@ def convert_tool_call_to_string(tool_call: dict) -> str:
return ret


def _summarize_schema_type(schema: object | None) -> str:
"""
Capture array, union, enum, and nested type info.
"""
if not isinstance(schema, dict):
return "unknown" if schema is None else str(schema)

for key in SCHEMA_UNION_KEYS:
if key in schema:
return " or ".join(_summarize_schema_type(option) for option in schema[key])

schema_type = schema.get("type")
if isinstance(schema_type, list):
return " or ".join(str(t) for t in schema_type)
if schema_type == "array":
items = schema.get("items")
if isinstance(items, list):
item_types = ", ".join(_summarize_schema_type(item) for item in items)
return f"array[{item_types}]"
if isinstance(items, dict):
return f"array[{_summarize_schema_type(items)}]"
return "array"
if schema_type:
return str(schema_type)
if "enum" in schema:
return "enum"
return "unknown"


def _indent(indent: int) -> str:
return " " * indent


def _nested_indent(indent: int, levels: int = 1) -> int:
return indent + SCHEMA_INDENT_STEP * levels


def _get_description(schema: dict[str, object] | None) -> str:
"""
Extract description from schema, or return placeholder if missing.
"""
if not isinstance(schema, dict):
return MISSING_DESCRIPTION_PLACEHOLDER
description = schema.get("description")
if isinstance(description, str) and description.strip():
return description
return MISSING_DESCRIPTION_PLACEHOLDER


def _format_union_details(schema: dict[str, object], indent: int) -> list[str] | None:
for key in SCHEMA_UNION_KEYS:
options = schema.get(key)
if not isinstance(options, list):
continue
lines = [f"{_indent(indent)}{key} options:"]
for option in options:
option_type = _summarize_schema_type(option)
option_line = f"{_indent(_nested_indent(indent))}- {option_type}"
option_line += (
f": {_get_description(option if isinstance(option, dict) else None)}"
)
lines.append(option_line)
lines.extend(_format_schema_detail(option, _nested_indent(indent, 2)))
return lines
return None


def _format_array_details(schema: dict[str, object], indent: int) -> list[str]:
lines = [f"{_indent(indent)}Array items:"]
items = schema.get("items")
if isinstance(items, list):
for index, item_schema in enumerate(items):
item_type = _summarize_schema_type(item_schema)
lines.append(
f"{_indent(_nested_indent(indent))}- index {index}: {item_type}"
)
lines.extend(_format_schema_detail(item_schema, _nested_indent(indent, 2)))
elif isinstance(items, dict):
lines.append(
f"{_indent(_nested_indent(indent))}Type: {_summarize_schema_type(items)}"
)
lines.extend(_format_schema_detail(items, _nested_indent(indent, 2)))
else:
lines.append(f"{_indent(_nested_indent(indent))}Type: unknown")
return lines


def _format_additional_properties(
additional_props: object | None, indent: int
) -> list[str]:
if isinstance(additional_props, dict):
line = (
f"{_indent(indent)}Additional properties allowed: "
f"{_summarize_schema_type(additional_props)}"
)
lines = [line]
lines.extend(_format_schema_detail(additional_props, _nested_indent(indent)))
return lines
if additional_props is True:
return [f"{_indent(indent)}Additional properties allowed."]
if additional_props is False:
return [f"{_indent(indent)}Additional properties not allowed."]
return []


def _format_object_details(schema: dict[str, Any], indent: int) -> list[str]:
lines: list[str] = []
properties = schema.get("properties", {})
required = set(schema.get("required", []))
if isinstance(properties, dict) and properties:
lines.append(f"{_indent(indent)}Object properties:")
for name, prop in properties.items():
prop_type = _summarize_schema_type(prop)
required_flag = "required" if name in required else "optional"
prop_desc = _get_description(prop if isinstance(prop, dict) else None)
lines.append(
f"{_indent(_nested_indent(indent))}- {name} ({prop_type},"
f" {required_flag}): {prop_desc}"
)
lines.extend(_format_schema_detail(prop, _nested_indent(indent, 2)))
lines.extend(
_format_additional_properties(schema.get("additionalProperties"), indent)
)
return lines


def _format_schema_detail(schema: object | None, indent: int = 4) -> list[str]:
"""Recursively describe arrays, objects, unions, and additional properties."""
if not isinstance(schema, dict):
return []

union_lines = _format_union_details(schema, indent)
if union_lines is not None:
return union_lines

schema_type = schema.get("type")
if isinstance(schema_type, list):
allowed_types = ", ".join(str(t) for t in schema_type)
return [f"{_indent(indent)}Allowed types: {allowed_types}"]

if schema_type == "array":
return _format_array_details(schema, indent)

if schema_type == "object":
return _format_object_details(schema, indent)

return []


def convert_tools_to_description(tools: list[ChatCompletionToolParam]) -> str:
ret = ""
for i, tool in enumerate(tools):
Expand All @@ -504,15 +658,14 @@ def convert_tools_to_description(tools: list[ChatCompletionToolParam]) -> str:
required_params = set(fn["parameters"].get("required", []))

for j, (param_name, param_info) in enumerate(properties.items()):
# Indicate required/optional in parentheses with type
is_required = param_name in required_params
param_status = "required" if is_required else "optional"
param_type = param_info.get("type", "string")
param_type = _summarize_schema_type(param_info)

# Get parameter description
desc = param_info.get("description", "No description provided")
desc = _get_description(
param_info if isinstance(param_info, dict) else None
)

# Handle enum values if present
if "enum" in param_info:
enum_values = ", ".join(f"`{v}`" for v in param_info["enum"])
desc += f"\nAllowed values: [{enum_values}]"
Expand All @@ -521,34 +674,10 @@ def convert_tools_to_description(tools: list[ChatCompletionToolParam]) -> str:
f" ({j + 1}) {param_name} ({param_type}, {param_status}): {desc}\n"
)

# Handle nested structure for array/object types
if param_type == "array" and "items" in param_info:
items = param_info["items"]
if items.get("type") == "object" and "properties" in items:
ret += " task_list array item structure:\n"
item_properties = items["properties"]
item_required = set(items.get("required", []))
for k, (item_param_name, item_param_info) in enumerate(
item_properties.items()
):
item_is_required = item_param_name in item_required
item_status = "required" if item_is_required else "optional"
item_type = item_param_info.get("type", "string")
item_desc = item_param_info.get(
"description", "No description provided"
)

# Handle enum values for nested items
if "enum" in item_param_info:
item_enum_values = ", ".join(
f"`{v}`" for v in item_param_info["enum"]
)
item_desc += f" Allowed values: [{item_enum_values}]"
detail_lines = _format_schema_detail(param_info, indent=6)
if detail_lines:
ret += "\n".join(detail_lines) + "\n"

ret += (
f" - {item_param_name} ({item_type}, "
f"{item_status}): {item_desc}\n"
)
else:
ret += "No parameters are required for this function.\n"

Expand Down
Loading
Loading