Monitoring and Observability for Python APIs: A Complete Guide
Introduction
Monitoring and observability are critical for maintaining healthy production APIs. Without proper monitoring, you're flying blind when issues occur. In this guide, we'll explore how to implement comprehensive monitoring, logging, and observability for Django and FastAPI applications.
📊 Why Observability Matters: Good observability helps you detect issues before users do, understand system behavior, and make data-driven decisions about performance and reliability.
The Three Pillars of Observability
Observability consists of three main components:
- Metrics: Quantitative measurements over time
- Logs: Event records with timestamps
- Traces: Request flows through distributed systems
Setting Up Structured Logging
Django Logging Configuration
Configure structured logging in Django:
# settings.py
import logging.config
import json
LOGGING = {
'version': 1,
'disable_existing_loggers': False,
'formatters': {
'json': {
'()': 'pythonjsonlogger.jsonlogger.JsonFormatter',
'format': '%(asctime)s %(name)s %(levelname)s %(message)s %(pathname)s %(lineno)d',
},
'verbose': {
'format': '{levelname} {asctime} {module} {process:d} {thread:d} {message}',
'style': '{',
},
},
'handlers': {
'console': {
'class': 'logging.StreamHandler',
'formatter': 'json',
},
'file': {
'class': 'logging.handlers.RotatingFileHandler',
'filename': 'logs/django.log',
'maxBytes': 1024 * 1024 * 10, # 10 MB
'backupCount': 5,
'formatter': 'json',
},
},
'root': {
'handlers': ['console', 'file'],
'level': 'INFO',
},
'loggers': {
'django': {
'handlers': ['console', 'file'],
'level': 'INFO',
'propagate': False,
},
'myapp': {
'handlers': ['console', 'file'],
'level': 'DEBUG',
'propagate': False,
},
},
}
FastAPI Logging Configuration
Set up structured logging for FastAPI:
# app/core/logging.py
import logging
import sys
from pythonjsonlogger import jsonlogger
def setup_logging():
root_logger = logging.getLogger()
root_logger.setLevel(logging.INFO)
handler = logging.StreamHandler(sys.stdout)
handler.setLevel(logging.INFO)
formatter = jsonlogger.JsonFormatter(
'%(asctime)s %(name)s %(levelname)s %(message)s %(pathname)s %(lineno)d'
)
handler.setFormatter(formatter)
root_logger.addHandler(handler)
return root_logger
# app/main.py
from app.core.logging import setup_logging
import logging
logger = setup_logging()
@app.middleware("http")
async def log_requests(request: Request, call_next):
start_time = time.time()
response = await call_next(request)
process_time = time.time() - start_time
logger.info(
"Request processed",
extra={
"method": request.method,
"path": request.url.path,
"status_code": response.status_code,
"process_time": process_time,
"client_ip": request.client.host,
}
)
response.headers["X-Process-Time"] = str(process_time)
return response
Metrics Collection
Using Prometheus with Django
Install and configure Prometheus client:
pip install django-prometheus
# settings.py
INSTALLED_APPS = [
'django_prometheus',
# ... other apps
]
MIDDLEWARE = [
'django_prometheus.middleware.PrometheusBeforeMiddleware',
# ... other middleware
'django_prometheus.middleware.PrometheusAfterMiddleware',
]
# urls.py
from django.urls import path, include
urlpatterns = [
path('', include('django_prometheus.urls')),
# ... other URLs
]
Custom Metrics
Create custom business metrics:
# app/metrics.py
from prometheus_client import Counter, Histogram, Gauge
import time
# Request metrics
http_requests_total = Counter(
'http_requests_total',
'Total HTTP requests',
['method', 'endpoint', 'status']
)
http_request_duration = Histogram(
'http_request_duration_seconds',
'HTTP request duration',
['method', 'endpoint']
)
# Business metrics
users_registered = Counter(
'users_registered_total',
'Total users registered'
)
active_users = Gauge(
'active_users_current',
'Current number of active users'
)
# Usage in views
from app.metrics import http_requests_total, http_request_duration
@api_view(['GET'])
def my_view(request):
start_time = time.time()
try:
# Your view logic
result = process_request(request)
# Record metrics
http_requests_total.labels(
method=request.method,
endpoint=request.path,
status=200
).inc()
return Response(result)
except Exception as e:
http_requests_total.labels(
method=request.method,
endpoint=request.path,
status=500
).inc()
raise
finally:
duration = time.time() - start_time
http_request_duration.labels(
method=request.method,
endpoint=request.path
).observe(duration)
FastAPI Metrics with Prometheus
# app/core/metrics.py
from prometheus_client import Counter, Histogram, Gauge, generate_latest
from fastapi import Response
from starlette.middleware.base import BaseHTTPMiddleware
import time
http_requests_total = Counter(
'http_requests_total',
'Total HTTP requests',
['method', 'endpoint', 'status']
)
http_request_duration = Histogram(
'http_request_duration_seconds',
'HTTP request duration',
['method', 'endpoint']
)
class MetricsMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request, call_next):
start_time = time.time()
response = await call_next(request)
duration = time.time() - start_time
http_requests_total.labels(
method=request.method,
endpoint=request.url.path,
status=response.status_code
).inc()
http_request_duration.labels(
method=request.method,
endpoint=request.url.path
).observe(duration)
return response
# app/main.py
from app.core.metrics import MetricsMiddleware, generate_latest
app.add_middleware(MetricsMiddleware)
@app.get("/metrics")
async def metrics():
return Response(content=generate_latest(), media_type="text/plain")
Distributed Tracing
Using OpenTelemetry
Implement distributed tracing with OpenTelemetry:
# Install: pip install opentelemetry-api opentelemetry-sdk opentelemetry-instrumentation-fastapi
# app/core/tracing.py
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor
def setup_tracing():
trace.set_tracer_provider(TracerProvider())
otlp_exporter = OTLPSpanExporter(
endpoint="http://localhost:4317",
insecure=True,
)
span_processor = BatchSpanProcessor(otlp_exporter)
trace.get_tracer_provider().add_span_processor(span_processor)
FastAPIInstrumentor.instrument()
HTTPXClientInstrumentor.instrument()
# app/main.py
from app.core.tracing import setup_tracing
setup_tracing()
Custom Spans
Create custom spans for important operations:
from opentelemetry import trace
tracer = trace.get_tracer(__name__)
async def process_order(order_id: int):
with tracer.start_as_current_span("process_order") as span:
span.set_attribute("order.id", order_id)
# Process payment
with tracer.start_as_current_span("process_payment") as payment_span:
payment_span.set_attribute("payment.amount", 100.00)
# Payment logic
payment_result = await charge_card(order_id)
payment_span.set_attribute("payment.status", payment_result.status)
# Update inventory
with tracer.start_as_current_span("update_inventory") as inventory_span:
# Inventory logic
await update_stock(order_id)
span.set_attribute("order.status", "completed")
Application Performance Monitoring (APM)
Using Sentry for Error Tracking
Integrate Sentry for error tracking:
# Install: pip install sentry-sdk
# settings.py (Django)
import sentry_sdk
from sentry_sdk.integrations.django import DjangoIntegration
sentry_sdk.init(
dsn="https://your-sentry-dsn@sentry.io/your-project-id",
integrations=[DjangoIntegration()],
traces_sample_rate=1.0,
send_default_pii=True,
environment="production",
)
# app/main.py (FastAPI)
import sentry_sdk
from sentry_sdk.integrations.fastapi import FastApiIntegration
from sentry_sdk.integrations.sqlalchemy import SqlalchemyIntegration
sentry_sdk.init(
dsn="https://your-sentry-dsn@sentry.io/your-project-id",
integrations=[
FastApiIntegration(),
SqlalchemyIntegration(),
],
traces_sample_rate=1.0,
environment="production",
)
Using Datadog APM
# Install: pip install ddtrace
# For Django
# settings.py
import ddtrace
from ddtrace import patch_all
patch_all()
# For FastAPI
# app/main.py
from ddtrace import patch_all
patch_all()
from ddtrace.contrib.fastapi import TraceMiddleware
app.add_middleware(TraceMiddleware)
Health Checks
Comprehensive Health Check Endpoint
# app/api/health.py
from fastapi import APIRouter, status
from fastapi.responses import JSONResponse
from sqlalchemy.ext.asyncio import AsyncSession
from app.db.session import get_db
from sqlalchemy import text
import time
router = APIRouter()
@router.get("/health")
async def health_check():
"""Basic health check."""
return {"status": "healthy"}
@router.get("/health/ready")
async def readiness_check(db: AsyncSession = Depends(get_db)):
"""Readiness check - verifies database connectivity."""
try:
await db.execute(text("SELECT 1"))
return {"status": "ready", "database": "connected"}
except Exception as e:
return JSONResponse(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
content={"status": "not ready", "error": str(e)}
)
@router.get("/health/live")
async def liveness_check():
"""Liveness check - verifies application is running."""
return {"status": "alive", "timestamp": time.time()}
@router.get("/health/detailed")
async def detailed_health_check(
db: AsyncSession = Depends(get_db),
redis_client = Depends(get_redis)
):
"""Detailed health check with all dependencies."""
health_status = {
"status": "healthy",
"timestamp": time.time(),
"checks": {}
}
# Database check
try:
await db.execute(text("SELECT 1"))
health_status["checks"]["database"] = "healthy"
except Exception as e:
health_status["checks"]["database"] = f"unhealthy: {str(e)}"
health_status["status"] = "degraded"
# Redis check
try:
await redis_client.ping()
health_status["checks"]["redis"] = "healthy"
except Exception as e:
health_status["checks"]["redis"] = f"unhealthy: {str(e)}"
health_status["status"] = "degraded"
status_code = status.HTTP_200_OK
if health_status["status"] == "unhealthy":
status_code = status.HTTP_503_SERVICE_UNAVAILABLE
return JSONResponse(content=health_status, status_code=status_code)
Log Aggregation
Using ELK Stack
Send logs to Elasticsearch:
# app/core/logging.py
from pythonjsonlogger import jsonlogger
import logging
from logging.handlers import HTTPHandler
class ElasticsearchHandler(HTTPHandler):
def __init__(self, host, port, index_name):
super().__init__(host, f":{port}/{index_name}/_doc")
self.index_name = index_name
def emit(self, record):
log_entry = self.format(record)
# Send to Elasticsearch via HTTP
super().emit(record)
Using CloudWatch (AWS)
# Install: pip install watchtower
import watchtower
import logging
# Configure CloudWatch handler
cloudwatch_handler = watchtower.CloudWatchLogHandler(
log_group="my-api",
stream_name="application"
)
logger = logging.getLogger()
logger.addHandler(cloudwatch_handler)
Alerting
Setting Up Alerts
Configure alerts based on metrics:
# app/core/alerts.py
from prometheus_client import Counter
import smtplib
from email.mime.text import MIMEText
error_counter = Counter('application_errors_total', 'Total application errors')
def send_alert(message: str, severity: str = "warning"):
"""Send alert notification."""
# Implement your alerting logic
# Email, Slack, PagerDuty, etc.
pass
# Usage
try:
# Your code
pass
except Exception as e:
error_counter.inc()
if error_counter._value.get() > 100:
send_alert(f"High error rate detected: {error_counter._value.get()}", "critical")
raise
Dashboard Creation
Grafana Dashboard Configuration
Create a Grafana dashboard for visualization:
{
"dashboard": {
"title": "API Monitoring Dashboard",
"panels": [
{
"title": "Request Rate",
"targets": [
{
"expr": "rate(http_requests_total[5m])",
"legendFormat": "{{method}} {{endpoint}}"
}
]
},
{
"title": "Error Rate",
"targets": [
{
"expr": "rate(http_requests_total{status=~'5..'}[5m])",
"legendFormat": "Errors"
}
]
},
{
"title": "Response Time",
"targets": [
{
"expr": "histogram_quantile(0.95, http_request_duration_seconds_bucket)",
"legendFormat": "p95"
}
]
}
]
}
}
Best Practices
- Log at appropriate levels: Use DEBUG for development, INFO for normal operations, WARNING for potential issues, ERROR for errors
- Include context: Always include request IDs, user IDs, and relevant context in logs
- Monitor key metrics: Track request rate, error rate, latency, and business metrics
- Set up alerts: Configure alerts for critical errors and performance degradation
- Use structured logging: JSON logs are easier to parse and query
- Sample traces: Don't trace every request in high-traffic systems
- Protect sensitive data: Never log passwords, tokens, or PII
- Centralize logs: Use log aggregation services for distributed systems
Conclusion
Implementing comprehensive monitoring and observability is essential for maintaining production APIs. By combining metrics, logs, and traces, you'll have full visibility into your application's behavior and be able to quickly identify and resolve issues.
🚀 Next Steps: Consider setting up automated alerting, creating custom dashboards, and implementing log retention policies.
Resources:
Related Articles: