Monitoring and Observability for Python APIs: A Complete Guide

Introduction

Monitoring and observability are critical for maintaining healthy production APIs. Without proper monitoring, you're flying blind when issues occur. In this guide, we'll explore how to implement comprehensive monitoring, logging, and observability for Django and FastAPI applications.

📊 Why Observability Matters: Good observability helps you detect issues before users do, understand system behavior, and make data-driven decisions about performance and reliability.

The Three Pillars of Observability

Observability consists of three main components:

  1. Metrics: Quantitative measurements over time
  2. Logs: Event records with timestamps
  3. Traces: Request flows through distributed systems

Setting Up Structured Logging

Django Logging Configuration

Configure structured logging in Django:

# settings.py
import logging.config
import json

LOGGING = {
    'version': 1,
    'disable_existing_loggers': False,
    'formatters': {
        'json': {
            '()': 'pythonjsonlogger.jsonlogger.JsonFormatter',
            'format': '%(asctime)s %(name)s %(levelname)s %(message)s %(pathname)s %(lineno)d',
        },
        'verbose': {
            'format': '{levelname} {asctime} {module} {process:d} {thread:d} {message}',
            'style': '{',
        },
    },
    'handlers': {
        'console': {
            'class': 'logging.StreamHandler',
            'formatter': 'json',
        },
        'file': {
            'class': 'logging.handlers.RotatingFileHandler',
            'filename': 'logs/django.log',
            'maxBytes': 1024 * 1024 * 10,  # 10 MB
            'backupCount': 5,
            'formatter': 'json',
        },
    },
    'root': {
        'handlers': ['console', 'file'],
        'level': 'INFO',
    },
    'loggers': {
        'django': {
            'handlers': ['console', 'file'],
            'level': 'INFO',
            'propagate': False,
        },
        'myapp': {
            'handlers': ['console', 'file'],
            'level': 'DEBUG',
            'propagate': False,
        },
    },
}

FastAPI Logging Configuration

Set up structured logging for FastAPI:

# app/core/logging.py
import logging
import sys
from pythonjsonlogger import jsonlogger

def setup_logging():
    root_logger = logging.getLogger()
    root_logger.setLevel(logging.INFO)
    
    handler = logging.StreamHandler(sys.stdout)
    handler.setLevel(logging.INFO)
    
    formatter = jsonlogger.JsonFormatter(
        '%(asctime)s %(name)s %(levelname)s %(message)s %(pathname)s %(lineno)d'
    )
    handler.setFormatter(formatter)
    
    root_logger.addHandler(handler)
    
    return root_logger

# app/main.py
from app.core.logging import setup_logging
import logging

logger = setup_logging()

@app.middleware("http")
async def log_requests(request: Request, call_next):
    start_time = time.time()
    
    response = await call_next(request)
    
    process_time = time.time() - start_time
    
    logger.info(
        "Request processed",
        extra={
            "method": request.method,
            "path": request.url.path,
            "status_code": response.status_code,
            "process_time": process_time,
            "client_ip": request.client.host,
        }
    )
    
    response.headers["X-Process-Time"] = str(process_time)
    return response

Metrics Collection

Using Prometheus with Django

Install and configure Prometheus client:

pip install django-prometheus
# settings.py
INSTALLED_APPS = [
    'django_prometheus',
    # ... other apps
]

MIDDLEWARE = [
    'django_prometheus.middleware.PrometheusBeforeMiddleware',
    # ... other middleware
    'django_prometheus.middleware.PrometheusAfterMiddleware',
]

# urls.py
from django.urls import path, include

urlpatterns = [
    path('', include('django_prometheus.urls')),
    # ... other URLs
]

Custom Metrics

Create custom business metrics:

# app/metrics.py
from prometheus_client import Counter, Histogram, Gauge
import time

# Request metrics
http_requests_total = Counter(
    'http_requests_total',
    'Total HTTP requests',
    ['method', 'endpoint', 'status']
)

http_request_duration = Histogram(
    'http_request_duration_seconds',
    'HTTP request duration',
    ['method', 'endpoint']
)

# Business metrics
users_registered = Counter(
    'users_registered_total',
    'Total users registered'
)

active_users = Gauge(
    'active_users_current',
    'Current number of active users'
)

# Usage in views
from app.metrics import http_requests_total, http_request_duration

@api_view(['GET'])
def my_view(request):
    start_time = time.time()
    
    try:
        # Your view logic
        result = process_request(request)
        
        # Record metrics
        http_requests_total.labels(
            method=request.method,
            endpoint=request.path,
            status=200
        ).inc()
        
        return Response(result)
    except Exception as e:
        http_requests_total.labels(
            method=request.method,
            endpoint=request.path,
            status=500
        ).inc()
        raise
    finally:
        duration = time.time() - start_time
        http_request_duration.labels(
            method=request.method,
            endpoint=request.path
        ).observe(duration)

FastAPI Metrics with Prometheus

# app/core/metrics.py
from prometheus_client import Counter, Histogram, Gauge, generate_latest
from fastapi import Response
from starlette.middleware.base import BaseHTTPMiddleware
import time

http_requests_total = Counter(
    'http_requests_total',
    'Total HTTP requests',
    ['method', 'endpoint', 'status']
)

http_request_duration = Histogram(
    'http_request_duration_seconds',
    'HTTP request duration',
    ['method', 'endpoint']
)

class MetricsMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request, call_next):
        start_time = time.time()
        
        response = await call_next(request)
        
        duration = time.time() - start_time
        
        http_requests_total.labels(
            method=request.method,
            endpoint=request.url.path,
            status=response.status_code
        ).inc()
        
        http_request_duration.labels(
            method=request.method,
            endpoint=request.url.path
        ).observe(duration)
        
        return response

# app/main.py
from app.core.metrics import MetricsMiddleware, generate_latest

app.add_middleware(MetricsMiddleware)

@app.get("/metrics")
async def metrics():
    return Response(content=generate_latest(), media_type="text/plain")

Distributed Tracing

Using OpenTelemetry

Implement distributed tracing with OpenTelemetry:

# Install: pip install opentelemetry-api opentelemetry-sdk opentelemetry-instrumentation-fastapi

# app/core/tracing.py
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor

def setup_tracing():
    trace.set_tracer_provider(TracerProvider())
    
    otlp_exporter = OTLPSpanExporter(
        endpoint="http://localhost:4317",
        insecure=True,
    )
    
    span_processor = BatchSpanProcessor(otlp_exporter)
    trace.get_tracer_provider().add_span_processor(span_processor)
    
    FastAPIInstrumentor.instrument()
    HTTPXClientInstrumentor.instrument()

# app/main.py
from app.core.tracing import setup_tracing

setup_tracing()

Custom Spans

Create custom spans for important operations:

from opentelemetry import trace

tracer = trace.get_tracer(__name__)

async def process_order(order_id: int):
    with tracer.start_as_current_span("process_order") as span:
        span.set_attribute("order.id", order_id)
        
        # Process payment
        with tracer.start_as_current_span("process_payment") as payment_span:
            payment_span.set_attribute("payment.amount", 100.00)
            # Payment logic
            payment_result = await charge_card(order_id)
            payment_span.set_attribute("payment.status", payment_result.status)
        
        # Update inventory
        with tracer.start_as_current_span("update_inventory") as inventory_span:
            # Inventory logic
            await update_stock(order_id)
        
        span.set_attribute("order.status", "completed")

Application Performance Monitoring (APM)

Using Sentry for Error Tracking

Integrate Sentry for error tracking:

# Install: pip install sentry-sdk

# settings.py (Django)
import sentry_sdk
from sentry_sdk.integrations.django import DjangoIntegration

sentry_sdk.init(
    dsn="https://your-sentry-dsn@sentry.io/your-project-id",
    integrations=[DjangoIntegration()],
    traces_sample_rate=1.0,
    send_default_pii=True,
    environment="production",
)

# app/main.py (FastAPI)
import sentry_sdk
from sentry_sdk.integrations.fastapi import FastApiIntegration
from sentry_sdk.integrations.sqlalchemy import SqlalchemyIntegration

sentry_sdk.init(
    dsn="https://your-sentry-dsn@sentry.io/your-project-id",
    integrations=[
        FastApiIntegration(),
        SqlalchemyIntegration(),
    ],
    traces_sample_rate=1.0,
    environment="production",
)

Using Datadog APM

# Install: pip install ddtrace

# For Django
# settings.py
import ddtrace
from ddtrace import patch_all

patch_all()

# For FastAPI
# app/main.py
from ddtrace import patch_all
patch_all()

from ddtrace.contrib.fastapi import TraceMiddleware

app.add_middleware(TraceMiddleware)

Health Checks

Comprehensive Health Check Endpoint

# app/api/health.py
from fastapi import APIRouter, status
from fastapi.responses import JSONResponse
from sqlalchemy.ext.asyncio import AsyncSession
from app.db.session import get_db
from sqlalchemy import text
import time

router = APIRouter()

@router.get("/health")
async def health_check():
    """Basic health check."""
    return {"status": "healthy"}

@router.get("/health/ready")
async def readiness_check(db: AsyncSession = Depends(get_db)):
    """Readiness check - verifies database connectivity."""
    try:
        await db.execute(text("SELECT 1"))
        return {"status": "ready", "database": "connected"}
    except Exception as e:
        return JSONResponse(
            status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
            content={"status": "not ready", "error": str(e)}
        )

@router.get("/health/live")
async def liveness_check():
    """Liveness check - verifies application is running."""
    return {"status": "alive", "timestamp": time.time()}

@router.get("/health/detailed")
async def detailed_health_check(
    db: AsyncSession = Depends(get_db),
    redis_client = Depends(get_redis)
):
    """Detailed health check with all dependencies."""
    health_status = {
        "status": "healthy",
        "timestamp": time.time(),
        "checks": {}
    }
    
    # Database check
    try:
        await db.execute(text("SELECT 1"))
        health_status["checks"]["database"] = "healthy"
    except Exception as e:
        health_status["checks"]["database"] = f"unhealthy: {str(e)}"
        health_status["status"] = "degraded"
    
    # Redis check
    try:
        await redis_client.ping()
        health_status["checks"]["redis"] = "healthy"
    except Exception as e:
        health_status["checks"]["redis"] = f"unhealthy: {str(e)}"
        health_status["status"] = "degraded"
    
    status_code = status.HTTP_200_OK
    if health_status["status"] == "unhealthy":
        status_code = status.HTTP_503_SERVICE_UNAVAILABLE
    
    return JSONResponse(content=health_status, status_code=status_code)

Log Aggregation

Using ELK Stack

Send logs to Elasticsearch:

# app/core/logging.py
from pythonjsonlogger import jsonlogger
import logging
from logging.handlers import HTTPHandler

class ElasticsearchHandler(HTTPHandler):
    def __init__(self, host, port, index_name):
        super().__init__(host, f":{port}/{index_name}/_doc")
        self.index_name = index_name
    
    def emit(self, record):
        log_entry = self.format(record)
        # Send to Elasticsearch via HTTP
        super().emit(record)

Using CloudWatch (AWS)

# Install: pip install watchtower

import watchtower
import logging

# Configure CloudWatch handler
cloudwatch_handler = watchtower.CloudWatchLogHandler(
    log_group="my-api",
    stream_name="application"
)

logger = logging.getLogger()
logger.addHandler(cloudwatch_handler)

Alerting

Setting Up Alerts

Configure alerts based on metrics:

# app/core/alerts.py
from prometheus_client import Counter
import smtplib
from email.mime.text import MIMEText

error_counter = Counter('application_errors_total', 'Total application errors')

def send_alert(message: str, severity: str = "warning"):
    """Send alert notification."""
    # Implement your alerting logic
    # Email, Slack, PagerDuty, etc.
    pass

# Usage
try:
    # Your code
    pass
except Exception as e:
    error_counter.inc()
    
    if error_counter._value.get() > 100:
        send_alert(f"High error rate detected: {error_counter._value.get()}", "critical")
    
    raise

Dashboard Creation

Grafana Dashboard Configuration

Create a Grafana dashboard for visualization:

{
  "dashboard": {
    "title": "API Monitoring Dashboard",
    "panels": [
      {
        "title": "Request Rate",
        "targets": [
          {
            "expr": "rate(http_requests_total[5m])",
            "legendFormat": "{{method}} {{endpoint}}"
          }
        ]
      },
      {
        "title": "Error Rate",
        "targets": [
          {
            "expr": "rate(http_requests_total{status=~'5..'}[5m])",
            "legendFormat": "Errors"
          }
        ]
      },
      {
        "title": "Response Time",
        "targets": [
          {
            "expr": "histogram_quantile(0.95, http_request_duration_seconds_bucket)",
            "legendFormat": "p95"
          }
        ]
      }
    ]
  }
}

Best Practices

  1. Log at appropriate levels: Use DEBUG for development, INFO for normal operations, WARNING for potential issues, ERROR for errors
  2. Include context: Always include request IDs, user IDs, and relevant context in logs
  3. Monitor key metrics: Track request rate, error rate, latency, and business metrics
  4. Set up alerts: Configure alerts for critical errors and performance degradation
  5. Use structured logging: JSON logs are easier to parse and query
  6. Sample traces: Don't trace every request in high-traffic systems
  7. Protect sensitive data: Never log passwords, tokens, or PII
  8. Centralize logs: Use log aggregation services for distributed systems

Conclusion

Implementing comprehensive monitoring and observability is essential for maintaining production APIs. By combining metrics, logs, and traces, you'll have full visibility into your application's behavior and be able to quickly identify and resolve issues.

🚀 Next Steps: Consider setting up automated alerting, creating custom dashboards, and implementing log retention policies.


Resources:

Related Articles:

STAY IN TOUCH

Get notified when I publish something new, and unsubscribe at any time.