Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ async def register_manual(self, caller, manual_call_template: CallTemplate) -> R
utcp_manual = UtcpManualSerializer().validate_dict(response_data)
else:
logger.info(f"Assuming OpenAPI spec from '{manual_call_template.name}'. Converting to UTCP manual.")
converter = OpenApiConverter(response_data, spec_url=manual_call_template.url, call_template_name=manual_call_template.name)
converter = OpenApiConverter(response_data, spec_url=manual_call_template.url, call_template_name=manual_call_template.name, inherited_auth=manual_call_template.auth)
utcp_manual = converter.convert()

return RegisterManualResult(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ class OpenApiConverter:
call_template_name: Normalized name for the call_template derived from the spec.
"""

def __init__(self, openapi_spec: Dict[str, Any], spec_url: Optional[str] = None, call_template_name: Optional[str] = None):
def __init__(self, openapi_spec: Dict[str, Any], spec_url: Optional[str] = None, call_template_name: Optional[str] = None, inherited_auth: Optional[Auth] = None):
"""Initializes the OpenAPI converter.

Args:
Expand All @@ -96,9 +96,12 @@ def __init__(self, openapi_spec: Dict[str, Any], spec_url: Optional[str] = None,
Used for base URL determination if servers are not specified.
call_template_name: Optional custom name for the call_template if
the specification title is not provided.
inherited_auth: Optional auth configuration inherited from the manual call template.
Used instead of generating placeholders when x-utcp-auth is present.
"""
self.spec = openapi_spec
self.spec_url = spec_url
self.inherited_auth = inherited_auth
# Single counter for all placeholder variables
self.placeholder_counter = 0
if call_template_name is None:
Expand Down Expand Up @@ -161,16 +164,41 @@ def convert(self) -> UtcpManual:
def _extract_auth(self, operation: Dict[str, Any]) -> Optional[Auth]:
"""
Extracts authentication information from OpenAPI operation and global security schemes."""

# First check for x-utcp-auth extension
if "x-utcp-auth" in operation:
utcp_auth = operation.get("x-utcp-auth") if isinstance(operation.get("x-utcp-auth"), dict) else {}
auth_type = utcp_auth.get("auth_type")

if auth_type == "api_key":
# Use inherited auth if available and compatible, otherwise create placeholder
inherited_var_name = self.inherited_auth.var_name.lower() if self.inherited_auth and self.inherited_auth.var_name else ""
utcp_var_name = utcp_auth.get("var_name", "Authorization").lower()
if (self.inherited_auth and
isinstance(self.inherited_auth, ApiKeyAuth) and
inherited_var_name == utcp_var_name and
self.inherited_auth.location == utcp_auth.get("location", "header")):
return self.inherited_auth
else:
api_key_placeholder = self._get_placeholder("API_KEY")
self._increment_placeholder_counter()
return ApiKeyAuth(
api_key=api_key_placeholder,
var_name=utcp_auth.get("var_name", "Authorization"),
location=utcp_auth.get("location", "header")
)

# Then fall back to standard OpenAPI security schemes
# First check for operation-level security requirements
security_requirements = operation.get("security", [])

# If no operation-level security, check global security requirements
if not security_requirements:
security_requirements = self.spec.get("security", [])

# If no security requirements, return None
# If no security requirements, return inherited auth if available
if not security_requirements:
return None
return self.inherited_auth

# Get security schemes - support both OpenAPI 2.0 and 3.0
security_schemes = self._get_security_schemes()
Expand Down
Loading
Loading