Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions pydantic_ai_slim/pydantic_ai/fastapi/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from pydantic_ai.fastapi.agent_router import create_agent_router
from pydantic_ai.fastapi.registry import AgentRegistry

__all__ = [
'AgentRegistry',
'create_agent_router',
]
76 changes: 76 additions & 0 deletions pydantic_ai_slim/pydantic_ai/fastapi/agent_router.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
try:
from fastapi import APIRouter, HTTPException
from fastapi.responses import StreamingResponse
from openai.types.chat.chat_completion import ChatCompletion
from openai.types.model import Model
from openai.types.responses import Response as OpenAIResponse
except ImportError as _import_error: # pragma: no cover
raise ImportError(
'Please install the `openai` and `fastapi` packages to enable the fastapi openai compatible endpoint, '
'you can use the `chat-completion` optional group — `pip install "pydantic-ai-slim[chat-completion]"`'
) from _import_error

from pydantic_ai.fastapi.api import AgentChatCompletionsAPI, AgentModelsAPI, AgentResponsesAPI
from pydantic_ai.fastapi.data_models import (
ChatCompletionRequest,
ModelsResponse,
ResponsesRequest,
)
from pydantic_ai.fastapi.registry import AgentRegistry


def create_agent_router(
agent_registry: AgentRegistry,
disable_responses_api: bool = False,
disable_completions_api: bool = False,
api_router: APIRouter | None = None,
) -> APIRouter:
"""FastAPI Router factory for Pydantic Agent exposure as OpenAI endpoint."""
if api_router is None:
api_router = APIRouter()
responses_api = AgentResponsesAPI(agent_registry)
completions_api = AgentChatCompletionsAPI(agent_registry)
models_api = AgentModelsAPI(agent_registry)
enable_responses_api = not disable_responses_api
enable_completions_api = not disable_completions_api

if enable_completions_api:

@api_router.post('/v1/chat/completions', response_model=ChatCompletion)
async def chat_completions( # type: ignore[reportUnusedFunction]
request: ChatCompletionRequest,
) -> ChatCompletion | StreamingResponse:
if getattr(request, 'stream', False):
return StreamingResponse(
completions_api.create_streaming_completion(request),
media_type='text/event-stream',
headers={
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
'Content-Type': 'text/plain; charset=utf-8',
},
)
else:
return await completions_api.create_completion(request)

if enable_responses_api:

@api_router.post('/v1/responses', response_model=OpenAIResponse)
async def responses( # type: ignore[reportUnusedFunction]
request: ResponsesRequest,
) -> OpenAIResponse:
if getattr(request, 'stream', False):
# TODO: add streaming support for responses api
raise HTTPException(status_code=501)
else:
return await responses_api.create_response(request)

@api_router.get('/v1/models', response_model=ModelsResponse)
async def get_models() -> ModelsResponse: # type: ignore[reportUnusedFunction]
return await models_api.list_models()

@api_router.get('/v1/models' + '/{model_id}', response_model=Model)
async def get_model(model_id: str) -> Model: # type: ignore[reportUnusedFunction]
return await models_api.get_model(model_id)

return api_router
9 changes: 9 additions & 0 deletions pydantic_ai_slim/pydantic_ai/fastapi/api/__init__.py
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This module shouldn't be called fastapi. The underlying package doesn't matter.

Copy link

@ion-elgreco ion-elgreco Nov 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are open to better name suggestions! Internally we called this differently

Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from pydantic_ai.fastapi.api.completions import AgentChatCompletionsAPI
from pydantic_ai.fastapi.api.models import AgentModelsAPI
from pydantic_ai.fastapi.api.responses import AgentResponsesAPI

__all__ = [
'AgentChatCompletionsAPI',
'AgentModelsAPI',
'AgentResponsesAPI',
]
121 changes: 121 additions & 0 deletions pydantic_ai_slim/pydantic_ai/fastapi/api/completions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
import json
import time
from collections.abc import AsyncGenerator
from typing import Any

try:
from fastapi import HTTPException
from openai.types import ErrorObject
from openai.types.chat.chat_completion import ChatCompletion
from openai.types.chat.chat_completion_chunk import ChatCompletionChunk, Choice as Chunkhoice, ChoiceDelta
except ImportError as _import_error: # pragma: no cover
raise ImportError(
'Please install the `openai` and `fastapi` packages to enable the fastapi openai compatible endpoint, '
'you can use the `chat-completion` optional group — `pip install "pydantic-ai-slim[chat-completion]"`'
) from _import_error

from pydantic import TypeAdapter

from pydantic_ai import Agent, _utils
from pydantic_ai.fastapi.convert import (
openai_chat_completions_2pai,
pai_result_to_openai_completions,
)
from pydantic_ai.fastapi.data_models import ChatCompletionRequest, ErrorResponse
from pydantic_ai.fastapi.registry import AgentRegistry
from pydantic_ai.settings import ModelSettings


class AgentChatCompletionsAPI:
"""Chat completions API openai <-> pydantic-ai conversion."""

def __init__(self, registry: AgentRegistry) -> None:
self.registry = registry

def get_agent(self, name: str) -> Agent:
"""Retrieves agent."""
try:
agent = self.registry.get_completions_agent(name)
except KeyError:
raise HTTPException(
status_code=404,
detail=ErrorResponse(
error=ErrorObject(
message=f'Model {name} is not available as chat completions API',
type='not_found_error',
),
).model_dump(),
)

return agent

async def create_completion(self, request: ChatCompletionRequest) -> ChatCompletion:
"""Create a non-streaming chat completion."""
model_name = request.model
agent = self.get_agent(model_name)

model_settings_ta = TypeAdapter(ModelSettings)
messages = openai_chat_completions_2pai(messages=request.messages)

async with agent:
result = await agent.run(
message_history=messages,
model_settings=model_settings_ta.validate_python(
{k: v for k, v in request.model_dump().items() if v is not None},
),
)

return pai_result_to_openai_completions(
result=result,
model=model_name,
)

async def create_streaming_completion(self, request: ChatCompletionRequest) -> AsyncGenerator[str]:
"""Create a streaming chat completion."""
model_name = request.model
agent = self.get_agent(model_name)
messages = openai_chat_completions_2pai(messages=request.messages)

role_sent = False

async with (
agent,
agent.run_stream(
message_history=messages,
) as result,
):
async for chunk in result.stream_text(delta=True):
delta = ChoiceDelta(
role='assistant' if not role_sent else None,
content=chunk,
)
role_sent = True

stream_response = ChatCompletionChunk(
id=f'chatcmpl-{_utils.now_utc().isoformat()}',
created=int(_utils.now_utc().timestamp()),
model=model_name,
object='chat.completion.chunk',
choices=[
Chunkhoice(
index=0,
delta=delta,
),
],
)

yield f'data: {stream_response.model_dump_json()}\n\n'

final_chunk: dict[str, Any] = {
'id': f'chatcmpl-{int(time.time())}',
'object': 'chat.completion.chunk',
'model': model_name,
'choices': [
{
'index': 0,
'delta': {},
'finish_reason': 'stop',
},
],
}
yield f'data: {json.dumps(final_chunk)}\n\n'
54 changes: 54 additions & 0 deletions pydantic_ai_slim/pydantic_ai/fastapi/api/models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import time

try:
from fastapi import HTTPException
from openai.types import ErrorObject
from openai.types.model import Model
except ImportError as _import_error: # pragma: no cover
raise ImportError(
'Please install the `openai` and `fastapi` packages to enable the fastapi openai compatible endpoint, '
'you can use the `chat-completion` optional group — `pip install "pydantic-ai-slim[chat-completion]"`'
) from _import_error

from pydantic_ai.fastapi.data_models import (
ErrorResponse,
ModelsResponse,
)
from pydantic_ai.fastapi.registry import AgentRegistry


class AgentModelsAPI:
"""Models API for pydantic-ai agents."""

def __init__(self, registry: AgentRegistry) -> None:
self.registry = registry

async def list_models(self) -> ModelsResponse:
"""List available models (OpenAI-compatible endpoint)."""
agents = self.registry.all_agents

models = [
Model(
id=name,
object='model',
created=int(time.time()),
owned_by='model_owner',
)
for name in agents
]
return ModelsResponse(data=models)

async def get_model(self, name: str) -> Model:
"""Get information about a specific model (OpenAI-compatible endpoint)."""
if name in self.registry.all_agents:
return Model(id=name, object='model', created=int(time.time()), owned_by='NDIA')
else:
raise HTTPException(
status_code=404,
detail=ErrorResponse(
error=ErrorObject(
type='not_found_error',
message=f"Model '{name}' not found",
),
).model_dump(),
)
68 changes: 68 additions & 0 deletions pydantic_ai_slim/pydantic_ai/fastapi/api/responses.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
from collections.abc import AsyncGenerator

try:
from fastapi import HTTPException
from openai.types import ErrorObject
from openai.types.responses import Response
except ImportError as _import_error: # pragma: no cover
raise ImportError(
'Please install the `openai` and `fastapi` packages to enable the fastapi openai compatible endpoint, '
'you can use the `chat-completion` optional group — `pip install "pydantic-ai-slim[chat-completion]"`'
) from _import_error

from pydantic_ai import Agent
from pydantic_ai.fastapi.convert import (
openai_responses_input_to_pai,
pai_result_to_openai_responses,
)
from pydantic_ai.fastapi.data_models import ErrorResponse, ResponsesRequest
from pydantic_ai.fastapi.registry import AgentRegistry
from pydantic_ai.models.openai import (
OpenAIResponsesModelSettings,
)


class AgentResponsesAPI:
"""Responses API openai <-> pydantic-ai conversion."""

def __init__(self, registry: AgentRegistry) -> None:
self.registry = registry

def get_agent(self, name: str) -> Agent:
"""Retrieves agent."""
try:
agent = self.registry.get_responses_agent(name)
except KeyError:
raise HTTPException(
status_code=404,
detail=ErrorResponse(
error=ErrorObject(
message=f'Model {name} is not available as responses API',
type='not_found_error',
),
).model_dump(),
)

return agent

async def create_response(self, request: ResponsesRequest) -> Response:
"""Create a non-streaming chat completion."""
model_name = request.model
agent = self.get_agent(model_name)

model_settings = OpenAIResponsesModelSettings(openai_previous_response_id='auto')
messages = openai_responses_input_to_pai(items=request.input)

async with agent:
result = await agent.run(
message_history=messages,
model_settings=model_settings,
)
return pai_result_to_openai_responses(
result=result,
model=model_name,
)

async def create_streaming_response(self, request: ResponsesRequest) -> AsyncGenerator[str]:
"""Create a streaming chat completion."""
raise NotImplementedError
13 changes: 13 additions & 0 deletions pydantic_ai_slim/pydantic_ai/fastapi/convert/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from pydantic_ai.fastapi.convert.convert_messages import (
openai_chat_completions_2pai,
openai_responses_input_to_pai,
pai_result_to_openai_completions,
pai_result_to_openai_responses,
)

__all__ = [
'openai_chat_completions_2pai',
'openai_responses_input_to_pai',
'pai_result_to_openai_completions',
'pai_result_to_openai_responses',
]
Loading
Loading