Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 147 additions & 1 deletion src/pipecat_flows/adapters.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from pipecat.adapters.schemas.function_schema import FunctionSchema
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.adapters.services.anthropic_adapter import AnthropicLLMAdapter
from pipecat.adapters.services.aws_nova_sonic_adapter import AWSNovaSonicLLMAdapter
from pipecat.adapters.services.bedrock_adapter import AWSBedrockLLMAdapter
from pipecat.adapters.services.gemini_adapter import GeminiLLMAdapter
from pipecat.adapters.services.open_ai_adapter import OpenAILLMAdapter
Expand Down Expand Up @@ -671,6 +672,146 @@ def convert_to_function_schema(self, function_def: Dict[str, Any]) -> FlowsFunct
)


class AWSNovaSonicAdapter(LLMAdapter):
"""Format adapter for AWS Nova Sonic.

Handles AWS Nova Sonic's speech-to-speech model format, converting between
OpenAI's format and Nova Sonic's as needed.
"""

def __init__(self):
"""Initialize the Nova Sonic adapter."""
super().__init__()
self.provider_adapter = AWSNovaSonicLLMAdapter()

def _get_function_name_from_dict(self, function_def: Dict[str, Any]) -> str:
"""Extract function name from Nova Sonic function definition.

Args:
function_def: Nova Sonic-formatted function definition dictionary

Returns:
Function name from the definition
"""
# Nova Sonic uses toolSpec format
if "toolSpec" in function_def:
return function_def["toolSpec"]["name"]
return function_def.get("name", "")

def format_summary_message(self, summary: str) -> dict:
"""Format summary as a user message for Nova Sonic."""
return {
"role": "user",
"content": [{"text": f"Here's a summary of the conversation:\n{summary}"}]
}

async def generate_summary(
self, llm: Any, summary_prompt: str, messages: List[dict]
) -> Optional[str]:
"""Generate summary using AWS Nova Sonic API directly.

Note: Nova Sonic is primarily a speech-to-speech model, so this may not be
the ideal way to generate summaries. Consider using a text-based model
for summary generation instead.
"""
try:
# For Nova Sonic, we'll use the same approach as Bedrock
# since Nova Sonic runs on Bedrock infrastructure
request_params = {
"modelId": llm._model,
"messages": [
{
"role": "user",
"content": [{"text": f"Conversation history: {messages}"}],
},
],
"inferenceConfig": {
"maxTokens": llm._params.max_tokens,
"temperature": llm._params.temperature,
"topP": llm._params.top_p,
},
}

if summary_prompt:
request_params["system"] = [{"text": summary_prompt}]

# Call Bedrock without streaming
response = llm._client.converse(**request_params)

# Extract the response text
if (
"output" in response
and "message" in response["output"]
and "content" in response["output"]["message"]
):
content = response["output"]["message"]["content"]
if isinstance(content, list):
for item in content:
if item.get("text"):
return item["text"]
elif isinstance(content, str):
return content

return None

except Exception as e:
logger.error(f"Nova Sonic summary generation failed: {e}", exc_info=True)
return None

def convert_to_function_schema(self, function_def: Dict[str, Any]) -> FlowsFunctionSchema:
"""Convert Nova Sonic function definition to FlowsFunctionSchema.

Args:
function_def: Nova Sonic function definition

Returns:
FlowsFunctionSchema equivalent with flow-specific fields
"""
# Initialize with default values
name = ""
description = ""
properties = {}
required = []

# Check for toolSpec format (Nova Sonic's native format)
if "toolSpec" in function_def:
tool_spec = function_def["toolSpec"]
name = tool_spec.get("name", "")
description = tool_spec.get("description", "")
input_schema = tool_spec.get("inputSchema", {})

if "json" in input_schema:
import json
schema = json.loads(input_schema["json"]) if isinstance(input_schema["json"], str) else input_schema["json"]
properties = schema.get("properties", {})
required = schema.get("required", [])
else:
# Handle standard format
name = function_def.get("name", "")
description = function_def.get("description", "")

# Handle input_schema format
if "input_schema" in function_def:
input_schema = function_def["input_schema"]
properties = input_schema.get("properties", {})
required = input_schema.get("required", [])

# Extract Flows-specific fields
handler = function_def.get("handler")
transition_to = function_def.get("transition_to")
transition_callback = function_def.get("transition_callback")

return FlowsFunctionSchema(
name=name,
description=description,
properties=properties,
required=required,
handler=handler,
transition_to=transition_to,
transition_callback=transition_callback,
)


def create_adapter(llm) -> LLMAdapter:
"""Create appropriate adapter based on LLM service type or inheritance.

Expand Down Expand Up @@ -704,6 +845,10 @@ def create_adapter(llm) -> LLMAdapter:
if llm_type == "AWSBedrockLLMService":
logger.debug("Creating Bedrock adapter")
return AWSBedrockAdapter()

if llm_type == "AWSNovaSonicLLMService":
logger.debug("Creating Nova Sonic adapter")
return AWSNovaSonicAdapter()

# Try to find OpenAILLMService for inheritance check
try:
Expand All @@ -725,6 +870,7 @@ def create_adapter(llm) -> LLMAdapter:
error_msg += "- For OpenAI: pip install 'pipecat-ai[openai]'\n"
error_msg += "- For Anthropic: pip install 'pipecat-ai[anthropic]'\n"
error_msg += "- For Google: pip install 'pipecat-ai[google]'\n"
error_msg += "- For Bedrock: pip install 'pipecat-ai[aws]'"
error_msg += "- For Bedrock: pip install 'pipecat-ai[aws]'\n"
error_msg += "- For Nova Sonic: pip install 'pipecat-ai[aws-nova-sonic]'"

raise ValueError(error_msg)