Implementing Real-Time Features with WebSockets in FastAPI and Django

Introduction

Real-time features have become essential for modern web applications. Whether it's live chat, notifications, collaborative editing, or live data updates, WebSockets provide a bidirectional communication channel between client and server. In this guide, we'll explore how to implement WebSocket-based real-time features in both FastAPI and Django.

🔌 Why WebSockets? Unlike HTTP's request-response model, WebSockets maintain a persistent connection, enabling real-time, bidirectional communication with low latency.

Understanding WebSockets

How WebSockets Work

  1. Handshake: Client initiates connection with HTTP upgrade request
  2. Connection: Server accepts and upgrades to WebSocket protocol
  3. Communication: Bidirectional message exchange
  4. Closure: Either side can close the connection

WebSocket vs HTTP

  • HTTP: Request-response, stateless, higher overhead
  • WebSocket: Persistent connection, stateful, lower overhead, real-time

WebSockets in FastAPI

Basic WebSocket Endpoint

from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from typing import List

app = FastAPI()

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    try:
        while True:
            data = await websocket.receive_text()
            await websocket.send_text(f"Message text was: {data}")
    except WebSocketDisconnect:
        print("Client disconnected")

# Client-side (JavaScript)
const ws = new WebSocket("ws://localhost:8000/ws");

ws.onopen = () => {
    console.log("Connected");
    ws.send("Hello Server!");
};

ws.onmessage = (event) => {
    console.log("Received:", event.data);
};

ws.onerror = (error) => {
    console.error("Error:", error);
};

ws.onclose = () => {
    console.log("Disconnected");
};

Connection Manager

Create a connection manager to handle multiple clients:

from fastapi import WebSocket
from typing import List, Dict
import json

class ConnectionManager:
    def __init__(self):
        self.active_connections: List[WebSocket] = []
        self.user_connections: Dict[str, List[WebSocket]] = {}
    
    async def connect(self, websocket: WebSocket, user_id: str = None):
        await websocket.accept()
        self.active_connections.append(websocket)
        
        if user_id:
            if user_id not in self.user_connections:
                self.user_connections[user_id] = []
            self.user_connections[user_id].append(websocket)
    
    def disconnect(self, websocket: WebSocket, user_id: str = None):
        self.active_connections.remove(websocket)
        
        if user_id and user_id in self.user_connections:
            self.user_connections[user_id].remove(websocket)
            if not self.user_connections[user_id]:
                del self.user_connections[user_id]
    
    async def send_personal_message(self, message: str, websocket: WebSocket):
        await websocket.send_text(message)
    
    async def broadcast(self, message: str):
        for connection in self.active_connections:
            await connection.send_text(message)
    
    async def send_to_user(self, message: str, user_id: str):
        if user_id in self.user_connections:
            for connection in self.user_connections[user_id]:
                await connection.send_text(message)

manager = ConnectionManager()

@app.websocket("/ws/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id: str):
    await manager.connect(websocket, client_id)
    try:
        while True:
            data = await websocket.receive_text()
            # Echo back to sender
            await manager.send_personal_message(f"Echo: {data}", websocket)
            # Broadcast to all
            await manager.broadcast(f"Client {client_id} says: {data}")
    except WebSocketDisconnect:
        manager.disconnect(websocket, client_id)

Authentication with WebSockets

from fastapi import WebSocket, WebSocketException, status
from jose import jwt
from app.core.config import settings

async def get_current_user_from_token(token: str):
    try:
        payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM])
        user_id: str = payload.get("sub")
        if user_id is None:
            return None
        return user_id
    except Exception:
        return None

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket, token: str = None):
    if not token:
        await websocket.close(code=status.WS_1008_POLICY_VIOLATION)
        return
    
    user_id = await get_current_user_from_token(token)
    if not user_id:
        await websocket.close(code=status.WS_1008_POLICY_VIOLATION)
        return
    
    await manager.connect(websocket, user_id)
    try:
        while True:
            data = await websocket.receive_text()
            # Process message
            await manager.send_to_user(f"Your message: {data}", user_id)
    except WebSocketDisconnect:
        manager.disconnect(websocket, user_id)

Real-Time Chat Application

from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from pydantic import BaseModel
from typing import List
import json
from datetime import datetime

app = FastAPI()

class ChatMessage(BaseModel):
    user: str
    message: str
    timestamp: datetime

class ChatManager:
    def __init__(self):
        self.connections: List[WebSocket] = []
        self.messages: List[ChatMessage] = []
    
    async def connect(self, websocket: WebSocket):
        await websocket.accept()
        self.connections.append(websocket)
        
        # Send chat history
        for msg in self.messages[-50:]:  # Last 50 messages
            await websocket.send_json(msg.dict())
    
    def disconnect(self, websocket: WebSocket):
        self.connections.remove(websocket)
    
    async def broadcast(self, message: ChatMessage):
        self.messages.append(message)
        message_dict = message.dict()
        
        for connection in self.connections:
            try:
                await connection.send_json(message_dict)
            except:
                pass

chat_manager = ChatManager()

@app.websocket("/ws/chat")
async def chat_endpoint(websocket: WebSocket):
    await chat_manager.connect(websocket)
    try:
        while True:
            data = await websocket.receive_json()
            message = ChatMessage(
                user=data["user"],
                message=data["message"],
                timestamp=datetime.now()
            )
            await chat_manager.broadcast(message)
    except WebSocketDisconnect:
        chat_manager.disconnect(websocket)

WebSockets in Django with Channels

Setting Up Django Channels

pip install channels channels-redis
# settings.py
INSTALLED_APPS = [
    'django.contrib.admin',
    'django.contrib.auth',
    'channels',
    'myapp',
]

ASGI_APPLICATION = 'myproject.asgi.application'

CHANNEL_LAYERS = {
    'default': {
        'BACKEND': 'channels_redis.core.RedisChannelLayer',
        'CONFIG': {
            "hosts": [('127.0.0.1', 6379)],
        },
    },
}

Basic WebSocket Consumer

# consumers.py
from channels.generic.websocket import AsyncWebsocketConsumer
import json

class ChatConsumer(AsyncWebsocketConsumer):
    async def connect(self):
        self.room_name = self.scope['url_route']['kwargs']['room_name']
        self.room_group_name = f'chat_{self.room_name}'
        
        # Join room group
        await self.channel_layer.group_add(
            self.room_group_name,
            self.channel_name
        )
        
        await self.accept()
    
    async def disconnect(self, close_code):
        # Leave room group
        await self.channel_layer.group_discard(
            self.room_group_name,
            self.channel_name
        )
    
    async def receive(self, text_data):
        text_data_json = json.loads(text_data)
        message = text_data_json['message']
        user = text_data_json['user']
        
        # Send message to room group
        await self.channel_layer.group_send(
            self.room_group_name,
            {
                'type': 'chat_message',
                'message': message,
                'user': user
            }
        )
    
    async def chat_message(self, event):
        message = event['message']
        user = event['user']
        
        # Send message to WebSocket
        await self.send(text_data=json.dumps({
            'message': message,
            'user': user
        }))

Routing WebSocket Connections

# routing.py
from django.urls import re_path
from . import consumers

websocket_urlpatterns = [
    re_path(r'ws/chat/(?P<room_name>\w+)/$', consumers.ChatConsumer.as_asgi()),
]

# asgi.py
import os
from django.core.asgi import get_asgi_application
from channels.routing import ProtocolTypeRouter, URLRouter
from channels.auth import AuthMiddlewareStack
import myapp.routing

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')

application = ProtocolTypeRouter({
    "http": get_asgi_application(),
    "websocket": AuthMiddlewareStack(
        URLRouter(
            myapp.routing.websocket_urlpatterns
        )
    ),
})

Advanced Patterns

1. Real-Time Notifications

# FastAPI
from fastapi import WebSocket
import asyncio

class NotificationManager:
    def __init__(self):
        self.connections: Dict[str, List[WebSocket]] = {}
    
    async def connect(self, websocket: WebSocket, user_id: str):
        await websocket.accept()
        if user_id not in self.connections:
            self.connections[user_id] = []
        self.connections[user_id].append(websocket)
    
    async def send_notification(self, user_id: str, notification: dict):
        if user_id in self.connections:
            for connection in self.connections[user_id]:
                try:
                    await connection.send_json(notification)
                except:
                    self.connections[user_id].remove(connection)

notification_manager = NotificationManager()

@app.websocket("/ws/notifications/{user_id}")
async def notification_endpoint(websocket: WebSocket, user_id: str):
    await notification_manager.connect(websocket, user_id)
    try:
        while True:
            # Keep connection alive
            await websocket.receive_text()
    except WebSocketDisconnect:
        notification_manager.connections[user_id].remove(websocket)

# Send notification from anywhere
async def create_notification(user_id: str, message: str):
    await notification_manager.send_notification(user_id, {
        "type": "notification",
        "message": message,
        "timestamp": datetime.now().isoformat()
    })

2. Live Data Updates

# FastAPI - Live stock prices example
import asyncio
from typing import Dict, List

class LiveDataManager:
    def __init__(self):
        self.connections: List[WebSocket] = []
        self.running = False
    
    async def connect(self, websocket: WebSocket):
        await websocket.accept()
        self.connections.append(websocket)
        
        if not self.running:
            self.running = True
            asyncio.create_task(self.broadcast_updates())
    
    def disconnect(self, websocket: WebSocket):
        self.connections.remove(websocket)
    
    async def broadcast_updates(self):
        while self.connections:
            # Simulate data updates
            data = {
                "timestamp": datetime.now().isoformat(),
                "prices": {
                    "AAPL": 150.25,
                    "GOOGL": 2800.50,
                    "MSFT": 350.75
                }
            }
            
            for connection in self.connections[:]:  # Copy list
                try:
                    await connection.send_json(data)
                except:
                    self.connections.remove(connection)
            
            await asyncio.sleep(1)  # Update every second

live_data_manager = LiveDataManager()

@app.websocket("/ws/live-data")
async def live_data_endpoint(websocket: WebSocket):
    await live_data_manager.connect(websocket)
    try:
        while True:
            await websocket.receive_text()  # Keep connection alive
    except WebSocketDisconnect:
        live_data_manager.disconnect(websocket)

3. Collaborative Editing

# FastAPI - Simple collaborative editor
from typing import Dict
import json

class DocumentManager:
    def __init__(self):
        self.documents: Dict[str, Dict] = {}
        self.connections: Dict[str, List[WebSocket]] = {}
    
    async def connect(self, websocket: WebSocket, doc_id: str):
        await websocket.accept()
        
        if doc_id not in self.connections:
            self.connections[doc_id] = []
            self.documents[doc_id] = {"content": "", "version": 0}
        
        self.connections[doc_id].append(websocket)
        
        # Send current document state
        await websocket.send_json({
            "type": "document_state",
            "content": self.documents[doc_id]["content"],
            "version": self.documents[doc_id]["version"]
        })
    
    def disconnect(self, websocket: WebSocket, doc_id: str):
        if doc_id in self.connections:
            self.connections[doc_id].remove(websocket)
    
    async def update_document(self, doc_id: str, content: str, user_id: str):
        if doc_id in self.documents:
            self.documents[doc_id]["content"] = content
            self.documents[doc_id]["version"] += 1
            
            # Broadcast to all connected clients
            update = {
                "type": "document_update",
                "content": content,
                "version": self.documents[doc_id]["version"],
                "user": user_id
            }
            
            if doc_id in self.connections:
                for connection in self.connections[doc_id][:]:
                    try:
                        await connection.send_json(update)
                    except:
                        self.connections[doc_id].remove(connection)

document_manager = DocumentManager()

@app.websocket("/ws/document/{doc_id}")
async def document_endpoint(websocket: WebSocket, doc_id: str, user_id: str):
    await document_manager.connect(websocket, doc_id)
    try:
        while True:
            data = await websocket.receive_json()
            
            if data["type"] == "update":
                await document_manager.update_document(
                    doc_id,
                    data["content"],
                    user_id
                )
    except WebSocketDisconnect:
        document_manager.disconnect(websocket, doc_id)

Error Handling

from fastapi import WebSocket, WebSocketException
from starlette import status

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    try:
        await websocket.accept()
        
        while True:
            try:
                data = await websocket.receive_text()
                # Process data
                await websocket.send_text(f"Echo: {data}")
            except WebSocketException as e:
                print(f"WebSocket error: {e}")
                break
    except Exception as e:
        print(f"Connection error: {e}")
        await websocket.close(code=status.WS_1006_ABNORMAL_CLOSURE)

Testing WebSockets

Testing FastAPI WebSockets

from fastapi.testclient import TestClient
import pytest

def test_websocket():
    client = TestClient(app)
    with client.websocket_connect("/ws") as websocket:
        websocket.send_text("Hello")
        data = websocket.receive_text()
        assert data == "Message text was: Hello"

Testing Django Channels

from channels.testing import WebsocketCommunicator
from myapp.consumers import ChatConsumer
import pytest

@pytest.mark.asyncio
async def test_chat_consumer():
    communicator = WebsocketCommunicator(ChatConsumer.as_asgi(), "/ws/chat/test/")
    connected, subprotocol = await communicator.connect()
    assert connected
    
    # Send message
    await communicator.send_json_to({
        "message": "Hello",
        "user": "test_user"
    })
    
    # Receive message
    response = await communicator.receive_json_from()
    assert response["message"] == "Hello"
    
    await communicator.disconnect()

Best Practices

  1. Handle disconnections gracefully - Always clean up connections
  2. Implement authentication - Verify user identity before accepting connections
  3. Use connection managers - Centralize connection handling
  4. Implement reconnection logic - Handle network interruptions
  5. Rate limiting - Prevent abuse with message rate limits
  6. Error handling - Catch and handle WebSocket errors
  7. Heartbeat/ping - Keep connections alive
  8. Message validation - Validate incoming messages
  9. Scalability - Use Redis for distributed systems
  10. Monitoring - Track connection counts and message rates

Conclusion

WebSockets enable real-time, bidirectional communication essential for modern web applications. Whether you're building chat applications, live notifications, or collaborative features, WebSockets provide the foundation for real-time experiences.

🚀 Next Steps: Consider implementing WebSocket authentication, adding message persistence, and scaling with Redis for production deployments.


Resources:

Related Articles:

STAY IN TOUCH

Get notified when I publish something new, and unsubscribe at any time.