-
Notifications
You must be signed in to change notification settings - Fork 211
Description
π§ Epic - Structured JSON Logging with Correlation IDs
Title: Structured JSON Logging with Correlation IDs
Goal: Implement comprehensive structured JSON logging with correlation ID tracking across all MCP Gateway components to enable advanced observability, debugging, and performance analysis
Why now: As the MCP Gateway scales with multiple protocols (MCP, A2A), marketplace features, and enterprise deployments, robust logging and tracing becomes critical for operational excellence, debugging complex workflows, and meeting enterprise observability requirements.
πββοΈ User Stories & Acceptance Criteria
Story 1 β Correlation ID Generation & Propagation
As a DevOps Engineer
I want every request to have a unique correlation ID that follows the request through all system components
So that I can trace complete request flows across services and identify issues quickly.
β Acceptance Criteria
Scenario: Automatic correlation ID generation
Given a new request arrives at the MCP Gateway
When the request doesn't contain an existing correlation ID
Then the system generates a new UUID correlation ID
And includes it in the response headers as "X-Correlation-ID"
And propagates it to all downstream service calls
And logs it in all related log entries
And maintains the correlation ID throughout the request lifecycle
Scenario: Correlation ID preservation from clients
Given a client request includes an "X-Correlation-ID" header
When the request is processed by the gateway
Then the existing correlation ID is preserved and used
And all log entries include the client-provided correlation ID
And downstream services receive the same correlation ID
And the response includes the same correlation ID in headers
Scenario: Cross-service correlation tracking
Given a request involves multiple internal services (tool invocation, authentication, caching)
When the request flows through these services
Then each service logs entries with the same correlation ID
And service-to-service calls propagate the correlation ID
And timing information is tracked per service
And the complete request flow can be reconstructed from logs
Story 2 β Structured JSON Log Format
As a Platform Engineer
I want all log entries to use a consistent, structured JSON format
So that logs can be easily parsed, searched, and analyzed by log aggregation tools.
β Acceptance Criteria
Scenario: Consistent JSON log structure
Given the MCP Gateway is processing requests
When any component writes log entries
Then all logs follow a consistent JSON schema with fields:
- timestamp (ISO 8601 format)
- level (DEBUG, INFO, WARN, ERROR, CRITICAL)
- correlation_id (UUID)
- component (service/module name)
- message (human-readable message)
- request_id (internal request identifier)
- user_id (if authenticated)
- execution_context (additional contextual data)
And all log entries are valid JSON
And log parsing never fails due to format issues
Scenario: Contextual log enrichment
Given different types of operations (tool calls, authentication, caching)
When log entries are generated
Then logs include operation-specific context:
- Tool invocations: tool_name, tool_id, parameters_hash, execution_time
- Authentication: auth_method, user_id, success/failure
- A2A operations: server_id, skill_id, task_id, task_state
- Database operations: query_type, execution_time, affected_rows
- Cache operations: cache_key, hit/miss, ttl
And sensitive data is automatically redacted
And context follows consistent naming conventions
Scenario: Error logging with stack traces
Given an error occurs during request processing
When the error is logged
Then the log entry includes:
- Complete stack trace in structured format
- Error code and category
- Input parameters (sanitized)
- System state information
- Recovery actions taken
And error logs can be easily filtered and analyzed
And related log entries can be found via correlation ID
Story 3 β Performance & Timing Analytics
As a Performance Engineer
I want detailed timing information logged for all operations
So that I can identify performance bottlenecks and optimize system performance.
β Acceptance Criteria
Scenario: Request timing instrumentation
Given a request is being processed
When the request flows through different components
Then timing information is logged for:
- Total request duration
- Authentication time
- Database query execution time
- Tool invocation duration
- Cache lookup/store time
- External API call duration (MCP servers, A2A servers)
And timing data includes start/end timestamps
And performance metrics can be aggregated by correlation ID
Scenario: Performance threshold alerts
Given performance thresholds are configured
When operations exceed defined thresholds
Then warning logs are generated with:
- Operation type and duration
- Threshold exceeded
- Performance context (load, concurrent requests)
- Suggested optimization actions
And performance alerts can trigger monitoring notifications
And trending analysis can identify performance degradation
Scenario: Resource utilization logging
Given system resource monitoring is enabled
When resource usage changes significantly
Then logs include:
- Memory usage per component
- CPU utilization during operations
- Database connection pool status
- Cache memory consumption
- Active request counts
And resource logs help with capacity planning
And resource exhaustion is detected early
Story 4 β Security & Audit Logging
As a Security Engineer
I want comprehensive security-focused logging with audit trails
So that I can monitor for security threats and maintain compliance audit trails.
β Acceptance Criteria
Scenario: Authentication and authorization logging
Given users authenticate and access resources
When authentication/authorization events occur
Then security logs capture:
- Authentication attempts (success/failure)
- Authorization decisions and policy evaluations
- JWT token validation results
- Role and permission changes
- Session creation and termination
- Failed access attempts with details
And security logs are tamper-evident
And sensitive information is properly redacted
Scenario: Data access audit trail
Given users access tools, resources, and data
When data access operations occur
Then audit logs record:
- Data accessed (type, identifier, size)
- Access method and purpose
- Data modifications or exports
- Tool invocations with input/output summaries
- A2A task executions and data flows
And audit logs support compliance reporting
And data lineage can be reconstructed from logs
Scenario: Security incident detection
Given potential security threats or anomalies
When suspicious patterns are detected
Then security alert logs include:
- Anomaly type and severity
- Attack patterns or indicators
- Affected resources and users
- Automatic mitigation actions taken
- Recommended manual interventions
And security logs integrate with SIEM systems
And incident response is accelerated by detailed logging
Story 5 β Log Aggregation & Search Interface
As a System Administrator
I want an integrated log search and analysis interface
So that I can quickly find and analyze log data without external tools.
β Acceptance Criteria
Scenario: Built-in log search interface
Given I access the Admin UI log section
When I want to search logs
Then I can:
- Search by correlation ID to see complete request flows
- Filter by time range, log level, and component
- Search by user ID, tool name, or error type
- Use structured queries on JSON log fields
- Export search results in multiple formats
And search results are paginated and performant
And real-time log streaming is available
Scenario: Log visualization and analytics
Given log data is available for analysis
When I view log analytics
Then I can see:
- Request volume and patterns over time
- Error rates and trending
- Performance metrics and distributions
- Top errors and their frequencies
- User activity patterns
- Tool usage analytics
And visualizations update in real-time
And custom dashboards can be created
Scenario: Correlation ID workflow tracing
Given a correlation ID from a problematic request
When I trace the workflow
Then I can see:
- Complete chronological request flow
- Service boundaries and handoffs
- Timing breakdowns by component
- Error points and recovery attempts
- Related log entries from all services
And workflow traces can be shared with team members
And trace analysis suggests optimization opportunities
Story 6 β External Integration & Export
As a Platform Engineer
I want to integrate with external logging and monitoring systems
So that MCP Gateway logs fit into our existing observability infrastructure.
β Acceptance Criteria
Scenario: Multiple log output destinations
Given external logging systems are configured
When logs are generated
Then logs can be sent to:
- Local files with rotation and compression
- Syslog servers (RFC 5424 compliant)
- Elasticsearch/OpenSearch clusters
- Splunk via HTTP Event Collector
- Cloud logging services (AWS CloudWatch, Azure Monitor, GCP Logging)
- Custom webhook endpoints
And multiple destinations can be active simultaneously
And log delivery failures are handled gracefully
Scenario: Standards-compliant log formats
Given integration with external systems
When logs are exported
Then logs support standard formats:
- OpenTelemetry trace format
- Common Event Format (CEF)
- GELF (Graylog Extended Log Format)
- ECS (Elastic Common Schema)
- Custom JSON schemas
And log formats can be configured per destination
And format conversion doesn't lose data
Scenario: Log shipping reliability
Given external log destinations
When network issues or destination failures occur
Then the system:
- Buffers logs locally during outages
- Retries failed deliveries with backoff
- Provides dead letter queues for undeliverable logs
- Monitors log shipping health and success rates
- Alerts on persistent delivery failures
And log shipping doesn't impact application performance
And log data integrity is maintained
πΌοΈ Architecture
1 β Structured Logging Architecture
flowchart TD
subgraph "MCP Gateway Core"
REQ[Incoming Request] --> CIG[Correlation ID Generator]
CIG --> CTX[Request Context]
CTX --> COMP[Gateway Components]
end
subgraph "Logging Infrastructure"
COMP --> LM[Log Manager]
LM --> LF[Log Formatter]
LF --> LE[Log Enricher]
LE --> LR[Log Router]
end
subgraph "Gateway Components"
COMP --> AUTH[Auth Service]
COMP --> TOOL[Tool Service]
COMP --> A2A[A2A Service]
COMP --> CACHE[Cache Service]
COMP --> DB[Database Layer]
AUTH --> LM
TOOL --> LM
A2A --> LM
CACHE --> LM
DB --> LM
end
subgraph "Log Processing"
LE --> SEC[Security Filter]
LE --> PERF[Performance Analyzer]
LE --> AUDIT[Audit Processor]
SEC --> LR
PERF --> LR
AUDIT --> LR
end
subgraph "Log Destinations"
LR --> LOCAL[Local Files]
LR --> SYSLOG[Syslog Server]
LR --> ELK[Elasticsearch]
LR --> CLOUD[Cloud Logging]
LR --> WEBHOOK[Custom Webhooks]
LR --> METRICS[Metrics Export]
end
subgraph "Log Analysis"
LOCAL --> LSI[Log Search Interface]
ELK --> LSI
LSI --> DASH[Admin Dashboard]
LSI --> ALERT[Alert Manager]
LSI --> VIZ[Visualization Engine]
end
subgraph "Monitoring Integration"
METRICS --> PROM[Prometheus]
ALERT --> NOTIF[Notification System]
PERF --> APM[APM Integration]
end
classDef core fill:#e3f2fd,stroke:#1976d2;
classDef logging fill:#f3e5f5,stroke:#7b1fa2;
classDef processing fill:#e8f5e8,stroke:#388e3c;
classDef destinations fill:#fff3e0,stroke:#f57c00;
classDef analysis fill:#fce4ec,stroke:#c2185b;
class CIG,CTX,COMP core
class LM,LF,LE,LR logging
class SEC,PERF,AUDIT processing
class LOCAL,SYSLOG,ELK,CLOUD,WEBHOOK destinations
class LSI,DASH,ALERT,VIZ analysis
2 β Correlation ID Flow & Request Tracing
sequenceDiagram
participant CLIENT as Client
participant GATEWAY as Gateway
participant AUTH as Auth Service
participant TOOL as Tool Service
participant A2A as A2A Service
participant EXT as External Server
participant LOGS as Log System
CLIENT ->> GATEWAY: HTTP Request
GATEWAY ->> GATEWAY: Generate/Extract Correlation ID
GATEWAY ->> LOGS: Log: Request Started (correlation_id, user_agent, path)
GATEWAY ->> AUTH: Authenticate (correlation_id)
AUTH ->> LOGS: Log: Auth Attempt (correlation_id, user_id, method)
AUTH ->> AUTH: Validate Credentials
AUTH ->> LOGS: Log: Auth Success (correlation_id, user_id, duration)
AUTH -->> GATEWAY: Authentication Result
GATEWAY ->> TOOL: Invoke Tool (correlation_id, tool_name)
TOOL ->> LOGS: Log: Tool Invocation Start (correlation_id, tool_id, parameters_hash)
alt Tool Type: A2A
TOOL ->> A2A: Execute A2A Task (correlation_id)
A2A ->> LOGS: Log: A2A Task Created (correlation_id, server_id, task_id)
A2A ->> EXT: A2A Protocol Call (X-Correlation-ID header)
EXT -->> A2A: A2A Response
A2A ->> LOGS: Log: A2A Task Completed (correlation_id, task_id, duration, status)
A2A -->> TOOL: A2A Result
else Tool Type: MCP
TOOL ->> EXT: MCP Tool Call (correlation_id in context)
EXT -->> TOOL: MCP Response
end
TOOL ->> LOGS: Log: Tool Invocation Complete (correlation_id, tool_id, duration, success)
TOOL -->> GATEWAY: Tool Result
GATEWAY ->> LOGS: Log: Request Completed (correlation_id, status_code, total_duration)
GATEWAY -->> CLIENT: HTTP Response (X-Correlation-ID header)
π Enhanced Design Sketch
Structured Logging System Architecture:
class StructuredLogger:
"""Main structured logging system with correlation ID support"""
def __init__(self):
self.log_formatter = JSONLogFormatter()
self.log_enricher = LogEnricher()
self.log_router = LogRouter()
self.correlation_tracker = CorrelationTracker()
self.performance_tracker = PerformanceTracker()
self.security_logger = SecurityLogger()
def get_logger(self, component_name: str) -> ComponentLogger:
"""Get a component-specific logger with correlation support"""
return ComponentLogger(
component_name=component_name,
formatter=self.log_formatter,
enricher=self.log_enricher,
router=self.log_router,
correlation_tracker=self.correlation_tracker
)
def start_request_context(
self,
request: Request,
user_context: Optional[UserContext] = None
) -> RequestContext:
"""Initialize request context with correlation ID"""
# Extract or generate correlation ID
correlation_id = self._extract_or_generate_correlation_id(request)
# Create request context
context = RequestContext(
correlation_id=correlation_id,
request_id=self._generate_request_id(),
user_id=user_context.user_id if user_context else None,
start_time=datetime.utcnow(),
request_path=request.url.path,
request_method=request.method,
user_agent=request.headers.get("user-agent"),
client_ip=self._extract_client_ip(request)
)
# Store in context manager
self.correlation_tracker.set_context(context)
# Log request start
self._log_request_start(context)
return context
def end_request_context(
self,
context: RequestContext,
response: Response
):
"""Complete request context and log summary"""
context.end_time = datetime.utcnow()
context.duration = (context.end_time - context.start_time).total_seconds()
context.status_code = response.status_code
context.response_size = len(response.body) if hasattr(response, 'body') else None
# Log request completion
self._log_request_complete(context)
# Clear context
self.correlation_tracker.clear_context()
def _extract_or_generate_correlation_id(self, request: Request) -> str:
"""Extract correlation ID from headers or generate new one"""
# Check for existing correlation ID in headers
correlation_id = request.headers.get("x-correlation-id")
if correlation_id:
return correlation_id
# Generate new UUID-based correlation ID
return str(uuid.uuid4())
class ComponentLogger:
"""Component-specific logger with automatic context enrichment"""
def __init__(
self,
component_name: str,
formatter: JSONLogFormatter,
enricher: LogEnricher,
router: LogRouter,
correlation_tracker: CorrelationTracker
):
self.component_name = component_name
self.formatter = formatter
self.enricher = enricher
self.router = router
self.correlation_tracker = correlation_tracker
def info(
self,
message: str,
extra_context: Dict[str, Any] = None,
**kwargs
):
"""Log info-level message with context"""
self._log(LogLevel.INFO, message, extra_context, **kwargs)
def error(
self,
message: str,
error: Exception = None,
extra_context: Dict[str, Any] = None,
**kwargs
):
"""Log error-level message with exception details"""
context = extra_context or {}
if error:
context.update({
"error_type": type(error).__name__,
"error_message": str(error),
"stack_trace": self._format_stack_trace(error)
})
self._log(LogLevel.ERROR, message, context, **kwargs)
def performance(
self,
operation_name: str,
duration: float,
extra_context: Dict[str, Any] = None,
**kwargs
):
"""Log performance timing information"""
context = extra_context or {}
context.update({
"operation": operation_name,
"duration_ms": duration * 1000,
"performance_category": self._categorize_performance(operation_name, duration)
})
# Determine log level based on performance thresholds
level = self._determine_performance_log_level(operation_name, duration)
self._log(level, f"Performance: {operation_name}", context, **kwargs)
def security(
self,
event_type: str,
message: str,
severity: SecuritySeverity,
extra_context: Dict[str, Any] = None,
**kwargs
):
"""Log security-related events"""
context = extra_context or {}
context.update({
"security_event_type": event_type,
"security_severity": severity.value,
"requires_attention": severity in [SecuritySeverity.HIGH, SecuritySeverity.CRITICAL]
})
level = self._map_security_severity_to_log_level(severity)
self._log(level, message, context, security=True, **kwargs)
def _log(
self,
level: LogLevel,
message: str,
extra_context: Dict[str, Any] = None,
security: bool = False,
**kwargs
):
"""Internal logging method with full context enrichment"""
# Get current request context
request_context = self.correlation_tracker.get_context()
# Build base log entry
log_entry = LogEntry(
timestamp=datetime.utcnow(),
level=level,
component=self.component_name,
message=message,
correlation_id=request_context.correlation_id if request_context else None,
request_id=request_context.request_id if request_context else None,
user_id=request_context.user_id if request_context else None
)
# Add extra context
if extra_context:
log_entry.context.update(extra_context)
# Add any additional kwargs
log_entry.context.update(kwargs)
# Enrich with system context
enriched_entry = self.enricher.enrich(log_entry, security=security)
# Format to JSON
formatted_entry = self.formatter.format(enriched_entry)
# Route to configured destinations
self.router.route(formatted_entry, level, security)
class JSONLogFormatter:
"""Formats log entries as structured JSON"""
def __init__(self):
self.sensitive_fields = {
"password", "token", "secret", "key", "auth", "credential",
"authorization", "x-api-key", "bearer"
}
def format(self, log_entry: LogEntry) -> Dict[str, Any]:
"""Format log entry as structured JSON"""
formatted = {
"@timestamp": log_entry.timestamp.isoformat() + "Z",
"level": log_entry.level.value,
"component": log_entry.component,
"message": log_entry.message,
"correlation_id": log_entry.correlation_id,
"request_id": log_entry.request_id,
"user_id": log_entry.user_id,
"hostname": socket.gethostname(),
"process_id": os.getpid(),
"thread_id": threading.current_thread().ident
}
# Add context with sensitive data redaction
if log_entry.context:
formatted["context"] = self._redact_sensitive_data(log_entry.context)
# Add performance metrics if available
if hasattr(log_entry, 'performance_metrics'):
formatted["performance"] = log_entry.performance_metrics
# Add security context if this is a security log
if hasattr(log_entry, 'security_context'):
formatted["security"] = log_entry.security_context
return formatted
def _redact_sensitive_data(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""Redact sensitive information from log data"""
redacted = {}
for key, value in data.items():
key_lower = key.lower()
if any(sensitive in key_lower for sensitive in self.sensitive_fields):
redacted[key] = "[REDACTED]"
elif isinstance(value, dict):
redacted[key] = self._redact_sensitive_data(value)
elif isinstance(value, list):
redacted[key] = [
self._redact_sensitive_data(item) if isinstance(item, dict) else item
for item in value
]
else:
redacted[key] = value
return redacted
class LogEnricher:
"""Enriches log entries with additional context and metadata"""
def enrich(self, log_entry: LogEntry, security: bool = False) -> LogEntry:
"""Enrich log entry with system and contextual information"""
# Add system information
log_entry.context.update({
"version": settings.app_version,
"environment": settings.environment,
"service_name": settings.app_name,
"instance_id": settings.instance_id
})
# Add memory and performance context
if settings.log_system_metrics:
log_entry.context.update({
"memory_usage_mb": psutil.Process().memory_info().rss / 1024 / 1024,
"cpu_percent": psutil.Process().cpu_percent(),
"active_connections": self._get_active_connection_count()
})
# Add security-specific enrichment
if security:
log_entry.security_context = {
"ip_reputation": self._check_ip_reputation(log_entry.context.get("client_ip")),
"geo_location": self._get_geo_location(log_entry.context.get("client_ip")),
"threat_indicators": self._analyze_threat_indicators(log_entry)
}
return log_entry
class LogRouter:
"""Routes formatted log entries to configured destinations"""
def __init__(self):
self.destinations = []
self.buffer_manager = LogBufferManager()
self.delivery_manager = LogDeliveryManager()
def add_destination(self, destination: LogDestination):
"""Add a log destination"""
self.destinations.append(destination)
def route(self, formatted_entry: Dict[str, Any], level: LogLevel, security: bool = False):
"""Route log entry to appropriate destinations"""
for destination in self.destinations:
# Check if destination should receive this log level
if not destination.should_accept(level, security):
continue
try:
# Attempt immediate delivery
destination.send(formatted_entry)
except Exception as e:
# Buffer for retry if immediate delivery fails
self.buffer_manager.buffer_log(destination, formatted_entry)
logger.warning(f"Log delivery failed to {destination.name}, buffering: {e}")
# Process any buffered logs
self.delivery_manager.process_buffered_logs()
class PerformanceTracker:
"""Tracks and analyzes performance metrics across requests"""
def __init__(self):
self.operation_timings = defaultdict(list)
self.performance_thresholds = {
"database_query": 0.1,
"tool_invocation": 2.0,
"authentication": 0.5,
"cache_operation": 0.01,
"a2a_task": 5.0
}
@contextmanager
def track_operation(self, operation_name: str, logger: ComponentLogger):
"""Context manager to track operation performance"""
start_time = time.time()
try:
yield
finally:
duration = time.time() - start_time
# Record timing
self.operation_timings[operation_name].append(duration)
# Log performance
logger.performance(operation_name, duration, {
"threshold_exceeded": duration > self.performance_thresholds.get(operation_name, float('inf'))
})
def get_performance_summary(self) -> Dict[str, Any]:
"""Get performance summary for analytics"""
summary = {}
for operation, timings in self.operation_timings.items():
if timings:
summary[operation] = {
"count": len(timings),
"avg_duration": statistics.mean(timings),
"p50_duration": statistics.median(timings),
"p95_duration": statistics.quantiles(timings, n=20)[18] if len(timings) >= 20 else max(timings),
"max_duration": max(timings),
"min_duration": min(timings)
}
return summary
class SecurityLogger:
"""Specialized logger for security events and audit trails"""
def __init__(self, component_logger: ComponentLogger):
self.logger = component_logger
self.threat_detector = ThreatDetector()
self.audit_processor = AuditProcessor()
def log_authentication_attempt(
self,
user_id: str,
auth_method: str,
success: bool,
client_ip: str,
additional_context: Dict[str, Any] = None
):
"""Log authentication attempts with security analysis"""
context = {
"user_id": user_id,
"auth_method": auth_method,
"success": success,
"client_ip": client_ip,
"failed_attempts_recent": self._count_recent_failures(user_id, client_ip),
**(additional_context or {})
}
# Analyze for threats
threat_level = self.threat_detector.analyze_auth_attempt(context)
severity = SecuritySeverity.LOW
if not success and context["failed_attempts_recent"] > 5:
severity = SecuritySeverity.HIGH
elif threat_level > 0.7:
severity = SecuritySeverity.CRITICAL
self.logger.security(
"authentication_attempt",
f"Authentication {'successful' if success else 'failed'} for user {user_id}",
severity,
context
)
def log_data_access(
self,
user_id: str,
resource_type: str,
resource_id: str,
access_type: str,
data_classification: str = None
):
"""Log data access for audit trails"""
context = {
"user_id": user_id,
"resource_type": resource_type,
"resource_id": resource_id,
"access_type": access_type,
"data_classification": data_classification,
"audit_required": data_classification in ["sensitive", "confidential", "restricted"]
}
# Process audit requirements
self.audit_processor.process_data_access(context)
self.logger.security(
"data_access",
f"User {user_id} accessed {resource_type} {resource_id}",
SecuritySeverity.LOW,
context
)
# Integration with existing services
class LoggingMiddleware:
"""FastAPI middleware for automatic request logging"""
def __init__(self, app: FastAPI):
self.app = app
self.structured_logger = StructuredLogger()
async def __call__(self, scope, receive, send):
if scope["type"] != "http":
await self.app(scope, receive, send)
return
request = Request(scope, receive)
# Start request context
context = self.structured_logger.start_request_context(request)
async def send_wrapper(message):
if message["type"] == "http.response.start":
# Create response object for logging
response = Response(status_code=message["status"])
self.structured_logger.end_request_context(context, response)
await send(message)
await self.app(scope, receive, send_wrapper)
Database Schema Extensions:
class LogEntry(Base):
"""Structured log entry storage for analysis and search"""
__tablename__ = "log_entries"
id: Mapped[str] = mapped_column(String, primary_key=True)
timestamp: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False)
correlation_id: Mapped[Optional[str]] = mapped_column(String, index=True)
request_id: Mapped[Optional[str]] = mapped_column(String, index=True)
# Log metadata
level: Mapped[str] = mapped_column(String, nullable=False, index=True)
component: Mapped[str] = mapped_column(String, nullable=False, index=True)
message: Mapped[str] = mapped_column(Text, nullable=False)
# User and request context
user_id: Mapped[Optional[str]] = mapped_column(String, index=True)
client_ip: Mapped[Optional[str]] = mapped_column(String)
user_agent: Mapped[Optional[str]] = mapped_column(String)
request_path: Mapped[Optional[str]] = mapped_column(String)
request_method: Mapped[Optional[str]] = mapped_column(String)
# Performance data
duration_ms: Mapped[Optional[float]] = mapped_column(Float)
operation_type: Mapped[Optional[str]] = mapped_column(String, index=True)
# Security context
is_security_event: Mapped[bool] = mapped_column(Boolean, default=False, index=True)
security_severity: Mapped[Optional[str]] = mapped_column(String, index=True)
# Structured context data
context: Mapped[Optional[Dict[str, Any]]] = mapped_column(JSON)
error_details: Mapped[Optional[Dict[str, Any]]] = mapped_column(JSON)
performance_metrics: Mapped[Optional[Dict[str, Any]]] = mapped_column(JSON)
# System information
hostname: Mapped[str] = mapped_column(String, nullable=False)
process_id: Mapped[int] = mapped_column(Integer, nullable=False)
version: Mapped[str] = mapped_column(String, nullable=False)
# Indexes for performance
__table_args__ = (
Index('idx_log_entries_correlation_time', 'correlation_id', 'timestamp'),
Index('idx_log_entries_user_time', 'user_id', 'timestamp'),
Index('idx_log_entries_level_time', 'level', 'timestamp'),
Index('idx_log_entries_component_time', 'component', 'timestamp'),
Index('idx_log_entries_security', 'is_security_event', 'security_severity', 'timestamp'),
)
class PerformanceMetric(Base):
"""Aggregated performance metrics from log analysis"""
__tablename__ = "performance_metrics"
id: Mapped[str] = mapped_column(String, primary_key=True)
timestamp: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False)
# Metric identification
operation_type: Mapped[str] = mapped_column(String, nullable=False, index=True)
component: Mapped[str] = mapped_column(String, nullable=False, index=True)
# Aggregated metrics
request_count: Mapped[int] = mapped_column(Integer, nullable=False)
avg_duration_ms: Mapped[float] = mapped_column(Float, nullable=False)
p50_duration_ms: Mapped[float] = mapped_column(Float, nullable=False)
p95_duration_ms: Mapped[float] = mapped_column(Float, nullable=False)
max_duration_ms: Mapped[float] = mapped_column(Float, nullable=False)
error_count: Mapped[int] = mapped_column(Integer, default=0)
error_rate: Mapped[float] = mapped_column(Float, default=0.0)
# Time window
window_start: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False)
window_end: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False)
__table_args__ = (
Index('idx_performance_operation_time', 'operation_type', 'window_start'),
Index('idx_performance_component_time', 'component', 'window_start'),
)
π§ Configuration
class StructuredLoggingConfig:
# === Core Logging ===
structured_logging_enabled: bool = True # Enable structured JSON logging
log_level: str = "INFO" # Default log level
correlation_id_header: str = "X-Correlation-ID" # Header name for correlation ID
request_id_generation: bool = True # Generate internal request IDs
# === Log Format ===
json_logging: bool = True # Use JSON format for all logs
timestamp_format: str = "iso8601" # Timestamp format (iso8601, epoch, custom)
timezone: str = "UTC" # Timezone for timestamps
include_hostname: bool = True # Include hostname in logs
include_process_info: bool = True # Include PID and thread info
# === Context Enrichment ===
auto_enrich_context: bool = True # Auto-enrich logs with context
include_system_metrics: bool = False # Include memory/CPU in logs
include_performance_metrics: bool = True # Include timing information
redact_sensitive_data: bool = True # Auto-redact sensitive fields
sensitive_field_patterns: List[str] = [ # Patterns for sensitive data
"password", "token", "secret", "key", "auth", "credential"
]
# === Performance Tracking ===
performance_logging_enabled: bool = True # Enable performance logging
performance_thresholds: Dict[str, float] = { # Operation time thresholds (seconds)
"database_query": 0.1,
"tool_invocation": 2.0,
"authentication": 0.5,
"cache_operation": 0.01,
"a2a_task": 5.0,
"request_total": 10.0
}
log_slow_operations: bool = True # Log operations exceeding thresholds
# === Security Logging ===
security_logging_enabled: bool = True # Enable security event logging
audit_trail_enabled: bool = True # Enable audit trail logging
failed_auth_threshold: int = 5 # Failed auth attempts before alert
log_data_access: bool = True # Log data access events
log_permission_changes: bool = True # Log permission/role changes
# === Log Destinations ===
console_logging: bool = True # Log to console/stdout
file_logging: bool = True # Log to files
file_path: str = "/var/log/mcpgateway/app.log" # Log file path
file_rotation_size: str = "100MB" # File rotation size
file_retention_days: int = 30 # Log file retention
# === External Integrations ===
syslog_enabled: bool = False # Enable syslog output
syslog_server: str = "localhost:514" # Syslog server address
syslog_facility: str = "local0" # Syslog facility
elasticsearch_enabled: bool = False # Enable Elasticsearch output
elasticsearch_url: str = "http://localhost:9200" # Elasticsearch URL
elasticsearch_index: str = "mcpgateway-logs" # Elasticsearch index name
webhook_logging_enabled: bool = False # Enable webhook output
webhook_url: str = "" # Webhook URL for log delivery
webhook_timeout: int = 10 # Webhook timeout (seconds)
cloud_logging_enabled: bool = False # Enable cloud logging
cloud_provider: str = "aws" # Cloud provider (aws, azure, gcp)
cloud_config: Dict[str, Any] = {} # Provider-specific config
# === Log Search & Analytics ===
log_search_enabled: bool = True # Enable built-in log search
log_retention_days: int = 90 # Log retention in database
log_indexing_enabled: bool = True # Enable full-text indexing
real_time_streaming: bool = True # Enable real-time log streaming
# === Performance & Limits ===
log_buffer_size: int = 1000 # Log buffer size before flush
log_flush_interval: int = 10 # Log flush interval (seconds)
max_log_entry_size: int = 65536 # Max size per log entry (bytes)
async_logging: bool = True # Use async logging for performance
# === Alerting ===
log_alerting_enabled: bool = True # Enable log-based alerting
error_rate_threshold: float = 0.05 # Error rate threshold for alerts
performance_degradation_threshold: float = 2.0 # Performance degradation multiplier
alert_webhook_url: str = "" # Webhook for alerts
# === Compliance ===
gdpr_compliance: bool = True # Enable GDPR compliance features
data_retention_policy: str = "90_days" # Data retention policy
log_encryption_at_rest: bool = False # Encrypt logs at rest
log_signing_enabled: bool = False # Enable log integrity signing
π± Admin UI Integration
Log Search & Analysis Interface
<!-- Structured Logging Dashboard -->
<div id="logging-panel" class="tab-panel">
<div class="logging-header">
<h2 class="text-2xl font-bold">Structured Logs & Analytics</h2>
<p class="text-gray-600">Search, analyze, and monitor system logs with correlation tracking</p>
<!-- Log Statistics -->
<div class="log-stats grid grid-cols-5 gap-4 my-4">
<div class="stat-card">
<h3>Log Entries Today</h3>
<span class="stat-number">2.4M</span>
<span class="stat-change">+15% vs yesterday</span>
</div>
<div class="stat-card">
<h3>Error Rate</h3>
<span class="stat-number">0.08%</span>
<span class="stat-status normal">Normal</span>
</div>
<div class="stat-card">
<h3>Avg Response Time</h3>
<span class="stat-number">284ms</span>
<span class="stat-change">-12ms from yesterday</span>
</div>
<div class="stat-card">
<h3>Active Correlations</h3>
<span class="stat-number">1,247</span>
<span class="stat-note">concurrent requests</span>
</div>
<div class="stat-card">
<h3>Security Events</h3>
<span class="stat-number">23</span>
<span class="stat-status attention">Needs attention</span>
</div>
</div>
</div>
<!-- Log Search Interface -->
<div class="log-search-section">
<div class="search-controls">
<div class="search-input-group">
<input type="text" id="log-search-query" placeholder="Search logs by message, correlation ID, user ID..." />
<button onclick="searchLogs()" class="btn btn-primary">Search</button>
<button onclick="clearLogSearch()" class="btn btn-secondary">Clear</button>
</div>
<div class="search-filters">
<select id="log-level-filter">
<option value="">All Levels</option>
<option value="DEBUG">Debug</option>
<option value="INFO">Info</option>
<option value="WARN">Warning</option>
<option value="ERROR">Error</option>
<option value="CRITICAL">Critical</option>
</select>
<select id="component-filter">
<option value="">All Components</option>
<option value="auth_service">Authentication</option>
<option value="tool_service">Tool Service</option>
<option value="a2a_service">A2A Service</option>
<option value="cache_service">Cache Service</option>
<option value="database">Database</option>
</select>
<input type="datetime-local" id="start-time-filter" />
<input type="datetime-local" id="end-time-filter" />
<label>
<input type="checkbox" id="security-events-only" />
Security Events Only
</label>
<label>
<input type="checkbox" id="errors-only" />
Errors Only
</label>
</div>
</div>
<!-- Quick Search Templates -->
<div class="quick-searches">
<h4>Quick Searches</h4>
<div class="quick-search-buttons">
<button onclick="quickSearch('recent-errors')" class="btn btn-sm btn-outline">Recent Errors</button>
<button onclick="quickSearch('slow-operations')" class="btn btn-sm btn-outline">Slow Operations</button>
<button onclick="quickSearch('auth-failures')" class="btn btn-sm btn-outline">Auth Failures</button>
<button onclick="quickSearch('security-alerts')" class="btn btn-sm btn-outline">Security Alerts</button>
<button onclick="quickSearch('high-volume-users')" class="btn btn-sm btn-outline">High Volume Users</button>
</div>
</div>
</div>
<!-- Log Results Display -->
<div class="log-results-section">
<div class="results-header">
<h3>Search Results</h3>
<div class="results-controls">
<span id="results-count">Showing 0 results</span>
<button onclick="exportLogResults()" class="btn btn-sm btn-secondary">Export</button>
<button onclick="toggleRealTimeMode()" class="btn btn-sm btn-secondary" id="realtime-toggle">
Enable Real-time
</button>
</div>
</div>
<div class="log-entries" id="log-entries-container">
<!-- Log entries populated dynamically -->
</div>
<!-- Pagination -->
<div class="log-pagination">
<button onclick="loadLogPage(currentPage - 1)">Previous</button>
<span>Page <span id="log-current-page">1</span> of <span id="log-total-pages">1</span></span>
<button onclick="loadLogPage(currentPage + 1)">Next</button>
</div>
</div>
<!-- Correlation ID Tracing -->
<div class="correlation-tracing-section">
<h3>Correlation ID Tracing</h3>
<div class="tracing-input">
<input type="text" id="correlation-id-input" placeholder="Enter correlation ID to trace request flow..." />
<button onclick="traceCorrelationId()" class="btn btn-primary">Trace Request</button>
</div>
<div id="correlation-trace-results" class="trace-results hidden">
<!-- Trace timeline populated dynamically -->
</div>
</div>
</div>
<!-- Log Entry Detail Modal -->
<div id="log-entry-detail-modal" class="modal">
<div class="modal-content log-entry-detail">
<div class="log-entry-header">
<h3>Log Entry Details</h3>
<div class="log-entry-meta">
<span class="timestamp"></span>
<span class="level-badge"></span>
<span class="component-badge"></span>
</div>
</div>
<div class="log-entry-content">
<div class="log-field">
<label>Message:</label>
<div class="message-content"></div>
</div>
<div class="log-field">
<label>Correlation ID:</label>
<div class="correlation-id">
<span class="correlation-value"></span>
<button onclick="traceFromModal()" class="btn btn-sm btn-secondary">Trace Flow</button>
</div>
</div>
<div class="log-field">
<label>Context:</label>
<pre class="context-json"></pre>
</div>
<div class="log-field performance-data hidden">
<label>Performance Data:</label>
<div class="performance-metrics"></div>
</div>
<div class="log-field error-data hidden">
<label>Error Details:</label>
<pre class="error-details"></pre>
</div>
</div>
</div>
</div>
Structured Logging JavaScript
class StructuredLoggingManager {
constructor() {
this.currentPage = 1;
this.pageSize = 50;
this.realTimeMode = false;
this.searchFilters = {};
this.realTimeEventSource = null;
}
async searchLogs() {
const query = document.getElementById('log-search-query').value;
const filters = this.collectSearchFilters();
try {
const response = await fetch('/admin/logs/search', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
query: query,
filters: filters,
page: this.currentPage,
page_size: this.pageSize
})
});
const results = await response.json();
this.displayLogResults(results);
} catch (error) {
this.showError('Failed to search logs: ' + error.message);
}
}
collectSearchFilters() {
return {
level: document.getElementById('log-level-filter').value,
component: document.getElementById('component-filter').value,
start_time: document.getElementById('start-time-filter').value,
end_time: document.getElementById('end-time-filter').value,
security_events_only: document.getElementById('security-events-only').checked,
errors_only: document.getElementById('errors-only').checked
};
}
displayLogResults(results) {
const container = document.getElementById('log-entries-container');
container.innerHTML = results.entries.map(entry => `
<div class="log-entry" data-entry-id="${entry.id}">
<div class="log-entry-header">
<span class="timestamp">${new Date(entry.timestamp).toLocaleString()}</span>
<span class="level-badge level-${entry.level.toLowerCase()}">${entry.level}</span>
<span class="component-badge">${entry.component}</span>
${entry.correlation_id ? `<span class="correlation-badge" onclick="traceCorrelationId('${entry.correlation_id}')">${entry.correlation_id}</span>` : ''}
</div>
<div class="log-entry-message" onclick="viewLogDetails('${entry.id}')">
${entry.message}
</div>
<div class="log-entry-context">
${entry.user_id ? `<span class="context-item">User: ${entry.user_id}</span>` : ''}
${entry.duration_ms ? `<span class="context-item">Duration: ${entry.duration_ms}ms</span>` : ''}
${entry.operation_type ? `<span class="context-item">Op: ${entry.operation_type}</span>` : ''}
${entry.is_security_event ? '<span class="context-item security">Security Event</span>' : ''}
</div>
</div>
`).join('');
// Update pagination
document.getElementById('results-count').textContent =
`Showing ${results.entries.length} of ${results.total_count} results`;
document.getElementById('log-current-page').textContent = this.currentPage;
document.getElementById('log-total-pages').textContent = results.total_pages;
}
async viewLogDetails(entryId) {
try {
const response = await fetch(`/admin/logs/${entryId}`);
const entry = await response.json();
this.populateLogDetailModal(entry);
this.openModal('log-entry-detail-modal');
} catch (error) {
this.showError('Failed to load log details: ' + error.message);
}
}
populateLogDetailModal(entry) {
// Basic entry info
document.querySelector('.timestamp').textContent =
new Date(entry.timestamp).toLocaleString();
document.querySelector('.level-badge').textContent = entry.level;
document.querySelector('.level-badge').className = `level-badge level-${entry.level.toLowerCase()}`;
document.querySelector('.component-badge').textContent = entry.component;
// Message content
document.querySelector('.message-content').textContent = entry.message;
// Correlation ID
document.querySelector('.correlation-value').textContent = entry.correlation_id || 'None';
// Context data
document.querySelector('.context-json').textContent =
JSON.stringify(entry.context || {}, null, 2);
// Performance data
if (entry.performance_metrics) {
document.querySelector('.performance-data').classList.remove('hidden');
document.querySelector('.performance-metrics').innerHTML =
this.formatPerformanceMetrics(entry.performance_metrics);
}
// Error details
if (entry.error_details) {
document.querySelector('.error-data').classList.remove('hidden');
document.querySelector('.error-details').textContent =
JSON.stringify(entry.error_details, null, 2);
}
}
async traceCorrelationId(correlationId = null) {
const corrId = correlationId || document.getElementById('correlation-id-input').value;
if (!corrId) {
this.showError('Please enter a correlation ID');
return;
}
try {
const response = await fetch(`/admin/logs/trace/${corrId}`);
const trace = await response.json();
this.displayCorrelationTrace(trace);
} catch (error) {
this.showError('Failed to trace correlation ID: ' + error.message);
}
}
displayCorrelationTrace(trace) {
const container = document.getElementById('correlation-trace-results');
container.innerHTML = `
<div class="trace-header">
<h4>Request Flow for ${trace.correlation_id}</h4>
<div class="trace-summary">
<span>Total Duration: ${trace.total_duration_ms}ms</span>
<span>Components: ${trace.components.length}</span>
<span>Log Entries: ${trace.entries.length}</span>
</div>
</div>
<div class="trace-timeline">
${trace.entries.map((entry, index) => `
<div class="trace-entry">
<div class="trace-time">${new Date(entry.timestamp).toLocaleTimeString()}</div>
<div class="trace-component">${entry.component}</div>
<div class="trace-message">${entry.message}</div>
<div class="trace-duration">
${entry.duration_ms ? `${entry.duration_ms}ms` : ''}
</div>
</div>
`).join('')}
</div>
`;
container.classList.remove('hidden');
}
toggleRealTimeMode() {
this.realTimeMode = !this.realTimeMode;
const button = document.getElementById('realtime-toggle');
if (this.realTimeMode) {
button.textContent = 'Disable Real-time';
this.startRealTimeLogging();
} else {
button.textContent = 'Enable Real-time';
this.stopRealTimeLogging();
}
}
startRealTimeLogging() {
this.realTimeEventSource = new EventSource('/admin/logs/stream');
this.realTimeEventSource.onmessage = (event) => {
const logEntry = JSON.parse(event.data);
this.addLogEntryToTop(logEntry);
};
this.realTimeEventSource.onerror = (error) => {
console.error('Real-time logging error:', error);
this.stopRealTimeLogging();
};
}
stopRealTimeLogging() {
if (this.realTimeEventSource) {
this.realTimeEventSource.close();
this.realTimeEventSource = null;
}
}
addLogEntryToTop(entry) {
const container = document.getElementById('log-entries-container');
const entryHtml = this.createLogEntryHtml(entry);
container.insertAdjacentHTML('afterbegin', entryHtml);
// Remove entries beyond page size to prevent memory issues
const entries = container.querySelectorAll('.log-entry');
if (entries.length > this.pageSize) {
entries[entries.length - 1].remove();
}
}
async quickSearch(searchType) {
const searchQueries = {
'recent-errors': {
query: '',
filters: { level: 'ERROR', start_time: this.getTimeAgo(1, 'hours') }
},
'slow-operations': {
query: 'duration_ms:>2000',
filters: { start_time: this.getTimeAgo(1, 'hours') }
},
'auth-failures': {
query: 'authentication failed',
filters: { component: 'auth_service', start_time: this.getTimeAgo(6, 'hours') }
},
'security-alerts': {
query: '',
filters: { security_events_only: true, start_time: this.getTimeAgo(24, 'hours') }
},
'high-volume-users': {
query: 'user_id:* AND count:>100',
filters: { start_time: this.getTimeAgo(1, 'hours') }
}
};
const searchConfig = searchQueries[searchType];
if (searchConfig) {
document.getElementById('log-search-query').value = searchConfig.query;
this.applySearchFilters(searchConfig.filters);
await this.searchLogs();
}
}
formatPerformanceMetrics(metrics) {
return Object.entries(metrics).map(([key, value]) =>
`<div class="metric-item">
<span class="metric-name">${key}:</span>
<span class="metric-value">${value}</span>
</div>`
).join('');
}
getTimeAgo(amount, unit) {
const now = new Date();
const units = {
hours: 60 * 60 * 1000,
days: 24 * 60 * 60 * 1000
};
return new Date(now.getTime() - (amount * units[unit])).toISOString();
}
}
// Initialize structured logging manager
const loggingManager = new StructuredLoggingManager();
// Auto-load recent logs on page load
document.addEventListener('DOMContentLoaded', () => {
loggingManager.quickSearch('recent-errors');
});
π Performance Expectations
Structured Logging Performance Metrics
Logging Operations Performance:
βββββββββββββββββββββββββββ¬βββββββββββββββ¬βββββββββββββββββ
β Operation β Target Time β Max Acceptable β
βββββββββββββββββββββββββββΌβββββββββββββββΌβββββββββββββββββ€
β Log entry creation β <1ms β 5ms β
β JSON formatting β <0.5ms β 2ms β
β Log search query β <500ms β 2 seconds β
β Correlation ID trace β <1 second β 5 seconds β
β Real-time log streaming β <100ms delay β 500ms delay β
β Log export operation β <10 seconds β 60 seconds β
β Performance analytics β <3 seconds β 15 seconds β
βββββββββββββββββββββββββββ΄βββββββββββββββ΄βββββββββββββββββ
Logging System Scalability:
- Support for 1M+ log entries per hour
- 100+ concurrent log searches
- Real-time streaming to 50+ concurrent clients
- Log retention for 90+ days with efficient querying
- Sub-second correlation ID tracing across 10K+ entries
Logging Overhead:
- Application performance impact: <2%
- Memory overhead per request: <1KB
- Disk space efficiency: 70% compression ratio
- Network bandwidth for external destinations: Configurable buffering
π§ Implementation Roadmap
Phase 1: Core Structured Logging (Weeks 1-3)
- Design and implement structured logging framework
- Create StructuredLogger and ComponentLogger classes
- Implement correlation ID generation and propagation
- Build JSON log formatter with sensitive data redaction
- Create basic log routing to console and files
Phase 2: Context & Performance Tracking (Weeks 4-6)
- Implement request context management
- Build performance tracking and timing instrumentation
- Create log enrichment with system and contextual data
- Add FastAPI middleware for automatic request logging
- Implement configurable performance thresholds and alerting
Phase 3: Security & Audit Logging (Weeks 7-9)
- Build SecurityLogger with threat detection
- Implement comprehensive audit trail logging
- Create authentication and authorization event logging
- Add data access logging with classification tracking
- Build security incident detection and alerting
Phase 4: Search & Analysis Interface (Weeks 10-12)
- Create log database schema and indexing
- Build log search API with filtering and pagination
- Implement correlation ID tracing functionality
- Create Admin UI log search and analysis interface
- Add real-time log streaming and visualization
Phase 5: External Integration & Advanced Features (Weeks 13-15)
- Implement external log destination support
- Build log shipping with reliability and buffering
- Add standards-compliant log format support
- Create log analytics and reporting dashboards
- Implement log-based alerting and notification system
π― Success Criteria
- Zero application performance impact from logging overhead in production
- 100% correlation ID coverage across all request flows
- Sub-second log search for queries across 1M+ entries
- Real-time log streaming with <100ms latency
- 99.9% log delivery reliability to external destinations
- Complete audit trail for all authentication and data access events
- Automated threat detection with <5 minute alert time
- Comprehensive request tracing enabling root cause analysis in <5 minutes
π Integration Points
Existing System Integration
- Authentication: Enhanced logging for all auth events and security analysis
- Tool Services: Performance tracking and error analysis for tool invocations
- A2A Integration: Comprehensive logging for A2A task lifecycle and performance
- Marketplace: User activity tracking and analytics for marketplace usage
- Admin UI: Integrated log search and analysis interface
External System Integration
- Monitoring Systems: Prometheus metrics export and APM integration
- SIEM Platforms: Security event forwarding and threat intelligence
- Log Aggregation: Elasticsearch, Splunk, and cloud logging service integration
- Alerting Systems: Integration with PagerDuty, Slack, and custom webhooks
π§© Additional Notes
- π Observability: Complete request tracing enables rapid debugging and optimization
- π‘οΈ Security: Comprehensive audit trails and threat detection enhance security posture
- π Analytics: Rich performance data enables data-driven optimization decisions
- π’ Enterprise Ready: Audit logging and compliance features meet enterprise requirements
- β‘ Performance: Async logging and efficient routing minimize application impact
- π§ Flexible: Multiple output destinations and configurable formats support diverse environments
- π Scalable: Designed to handle high-volume logging requirements efficiently
- π― Actionable: Smart alerting and analytics provide actionable insights for operations teams