Implementing Real-Time Features with WebSockets in FastAPI and Django
Introduction
Real-time features have become essential for modern web applications. Whether it's live chat, notifications, collaborative editing, or live data updates, WebSockets provide a bidirectional communication channel between client and server. In this guide, we'll explore how to implement WebSocket-based real-time features in both FastAPI and Django.
🔌 Why WebSockets? Unlike HTTP's request-response model, WebSockets maintain a persistent connection, enabling real-time, bidirectional communication with low latency.
Understanding WebSockets
How WebSockets Work
- Handshake: Client initiates connection with HTTP upgrade request
- Connection: Server accepts and upgrades to WebSocket protocol
- Communication: Bidirectional message exchange
- Closure: Either side can close the connection
WebSocket vs HTTP
- HTTP: Request-response, stateless, higher overhead
- WebSocket: Persistent connection, stateful, lower overhead, real-time
WebSockets in FastAPI
Basic WebSocket Endpoint
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from typing import List
app = FastAPI()
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
try:
while True:
data = await websocket.receive_text()
await websocket.send_text(f"Message text was: {data}")
except WebSocketDisconnect:
print("Client disconnected")
# Client-side (JavaScript)
const ws = new WebSocket("ws://localhost:8000/ws");
ws.onopen = () => {
console.log("Connected");
ws.send("Hello Server!");
};
ws.onmessage = (event) => {
console.log("Received:", event.data);
};
ws.onerror = (error) => {
console.error("Error:", error);
};
ws.onclose = () => {
console.log("Disconnected");
};
Connection Manager
Create a connection manager to handle multiple clients:
from fastapi import WebSocket
from typing import List, Dict
import json
class ConnectionManager:
def __init__(self):
self.active_connections: List[WebSocket] = []
self.user_connections: Dict[str, List[WebSocket]] = {}
async def connect(self, websocket: WebSocket, user_id: str = None):
await websocket.accept()
self.active_connections.append(websocket)
if user_id:
if user_id not in self.user_connections:
self.user_connections[user_id] = []
self.user_connections[user_id].append(websocket)
def disconnect(self, websocket: WebSocket, user_id: str = None):
self.active_connections.remove(websocket)
if user_id and user_id in self.user_connections:
self.user_connections[user_id].remove(websocket)
if not self.user_connections[user_id]:
del self.user_connections[user_id]
async def send_personal_message(self, message: str, websocket: WebSocket):
await websocket.send_text(message)
async def broadcast(self, message: str):
for connection in self.active_connections:
await connection.send_text(message)
async def send_to_user(self, message: str, user_id: str):
if user_id in self.user_connections:
for connection in self.user_connections[user_id]:
await connection.send_text(message)
manager = ConnectionManager()
@app.websocket("/ws/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id: str):
await manager.connect(websocket, client_id)
try:
while True:
data = await websocket.receive_text()
# Echo back to sender
await manager.send_personal_message(f"Echo: {data}", websocket)
# Broadcast to all
await manager.broadcast(f"Client {client_id} says: {data}")
except WebSocketDisconnect:
manager.disconnect(websocket, client_id)
Authentication with WebSockets
from fastapi import WebSocket, WebSocketException, status
from jose import jwt
from app.core.config import settings
async def get_current_user_from_token(token: str):
try:
payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM])
user_id: str = payload.get("sub")
if user_id is None:
return None
return user_id
except Exception:
return None
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket, token: str = None):
if not token:
await websocket.close(code=status.WS_1008_POLICY_VIOLATION)
return
user_id = await get_current_user_from_token(token)
if not user_id:
await websocket.close(code=status.WS_1008_POLICY_VIOLATION)
return
await manager.connect(websocket, user_id)
try:
while True:
data = await websocket.receive_text()
# Process message
await manager.send_to_user(f"Your message: {data}", user_id)
except WebSocketDisconnect:
manager.disconnect(websocket, user_id)
Real-Time Chat Application
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from pydantic import BaseModel
from typing import List
import json
from datetime import datetime
app = FastAPI()
class ChatMessage(BaseModel):
user: str
message: str
timestamp: datetime
class ChatManager:
def __init__(self):
self.connections: List[WebSocket] = []
self.messages: List[ChatMessage] = []
async def connect(self, websocket: WebSocket):
await websocket.accept()
self.connections.append(websocket)
# Send chat history
for msg in self.messages[-50:]: # Last 50 messages
await websocket.send_json(msg.dict())
def disconnect(self, websocket: WebSocket):
self.connections.remove(websocket)
async def broadcast(self, message: ChatMessage):
self.messages.append(message)
message_dict = message.dict()
for connection in self.connections:
try:
await connection.send_json(message_dict)
except:
pass
chat_manager = ChatManager()
@app.websocket("/ws/chat")
async def chat_endpoint(websocket: WebSocket):
await chat_manager.connect(websocket)
try:
while True:
data = await websocket.receive_json()
message = ChatMessage(
user=data["user"],
message=data["message"],
timestamp=datetime.now()
)
await chat_manager.broadcast(message)
except WebSocketDisconnect:
chat_manager.disconnect(websocket)
WebSockets in Django with Channels
Setting Up Django Channels
pip install channels channels-redis
# settings.py
INSTALLED_APPS = [
'django.contrib.admin',
'django.contrib.auth',
'channels',
'myapp',
]
ASGI_APPLICATION = 'myproject.asgi.application'
CHANNEL_LAYERS = {
'default': {
'BACKEND': 'channels_redis.core.RedisChannelLayer',
'CONFIG': {
"hosts": [('127.0.0.1', 6379)],
},
},
}
Basic WebSocket Consumer
# consumers.py
from channels.generic.websocket import AsyncWebsocketConsumer
import json
class ChatConsumer(AsyncWebsocketConsumer):
async def connect(self):
self.room_name = self.scope['url_route']['kwargs']['room_name']
self.room_group_name = f'chat_{self.room_name}'
# Join room group
await self.channel_layer.group_add(
self.room_group_name,
self.channel_name
)
await self.accept()
async def disconnect(self, close_code):
# Leave room group
await self.channel_layer.group_discard(
self.room_group_name,
self.channel_name
)
async def receive(self, text_data):
text_data_json = json.loads(text_data)
message = text_data_json['message']
user = text_data_json['user']
# Send message to room group
await self.channel_layer.group_send(
self.room_group_name,
{
'type': 'chat_message',
'message': message,
'user': user
}
)
async def chat_message(self, event):
message = event['message']
user = event['user']
# Send message to WebSocket
await self.send(text_data=json.dumps({
'message': message,
'user': user
}))
Routing WebSocket Connections
# routing.py
from django.urls import re_path
from . import consumers
websocket_urlpatterns = [
re_path(r'ws/chat/(?P<room_name>\w+)/$', consumers.ChatConsumer.as_asgi()),
]
# asgi.py
import os
from django.core.asgi import get_asgi_application
from channels.routing import ProtocolTypeRouter, URLRouter
from channels.auth import AuthMiddlewareStack
import myapp.routing
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')
application = ProtocolTypeRouter({
"http": get_asgi_application(),
"websocket": AuthMiddlewareStack(
URLRouter(
myapp.routing.websocket_urlpatterns
)
),
})
Advanced Patterns
1. Real-Time Notifications
# FastAPI
from fastapi import WebSocket
import asyncio
class NotificationManager:
def __init__(self):
self.connections: Dict[str, List[WebSocket]] = {}
async def connect(self, websocket: WebSocket, user_id: str):
await websocket.accept()
if user_id not in self.connections:
self.connections[user_id] = []
self.connections[user_id].append(websocket)
async def send_notification(self, user_id: str, notification: dict):
if user_id in self.connections:
for connection in self.connections[user_id]:
try:
await connection.send_json(notification)
except:
self.connections[user_id].remove(connection)
notification_manager = NotificationManager()
@app.websocket("/ws/notifications/{user_id}")
async def notification_endpoint(websocket: WebSocket, user_id: str):
await notification_manager.connect(websocket, user_id)
try:
while True:
# Keep connection alive
await websocket.receive_text()
except WebSocketDisconnect:
notification_manager.connections[user_id].remove(websocket)
# Send notification from anywhere
async def create_notification(user_id: str, message: str):
await notification_manager.send_notification(user_id, {
"type": "notification",
"message": message,
"timestamp": datetime.now().isoformat()
})
2. Live Data Updates
# FastAPI - Live stock prices example
import asyncio
from typing import Dict, List
class LiveDataManager:
def __init__(self):
self.connections: List[WebSocket] = []
self.running = False
async def connect(self, websocket: WebSocket):
await websocket.accept()
self.connections.append(websocket)
if not self.running:
self.running = True
asyncio.create_task(self.broadcast_updates())
def disconnect(self, websocket: WebSocket):
self.connections.remove(websocket)
async def broadcast_updates(self):
while self.connections:
# Simulate data updates
data = {
"timestamp": datetime.now().isoformat(),
"prices": {
"AAPL": 150.25,
"GOOGL": 2800.50,
"MSFT": 350.75
}
}
for connection in self.connections[:]: # Copy list
try:
await connection.send_json(data)
except:
self.connections.remove(connection)
await asyncio.sleep(1) # Update every second
live_data_manager = LiveDataManager()
@app.websocket("/ws/live-data")
async def live_data_endpoint(websocket: WebSocket):
await live_data_manager.connect(websocket)
try:
while True:
await websocket.receive_text() # Keep connection alive
except WebSocketDisconnect:
live_data_manager.disconnect(websocket)
3. Collaborative Editing
# FastAPI - Simple collaborative editor
from typing import Dict
import json
class DocumentManager:
def __init__(self):
self.documents: Dict[str, Dict] = {}
self.connections: Dict[str, List[WebSocket]] = {}
async def connect(self, websocket: WebSocket, doc_id: str):
await websocket.accept()
if doc_id not in self.connections:
self.connections[doc_id] = []
self.documents[doc_id] = {"content": "", "version": 0}
self.connections[doc_id].append(websocket)
# Send current document state
await websocket.send_json({
"type": "document_state",
"content": self.documents[doc_id]["content"],
"version": self.documents[doc_id]["version"]
})
def disconnect(self, websocket: WebSocket, doc_id: str):
if doc_id in self.connections:
self.connections[doc_id].remove(websocket)
async def update_document(self, doc_id: str, content: str, user_id: str):
if doc_id in self.documents:
self.documents[doc_id]["content"] = content
self.documents[doc_id]["version"] += 1
# Broadcast to all connected clients
update = {
"type": "document_update",
"content": content,
"version": self.documents[doc_id]["version"],
"user": user_id
}
if doc_id in self.connections:
for connection in self.connections[doc_id][:]:
try:
await connection.send_json(update)
except:
self.connections[doc_id].remove(connection)
document_manager = DocumentManager()
@app.websocket("/ws/document/{doc_id}")
async def document_endpoint(websocket: WebSocket, doc_id: str, user_id: str):
await document_manager.connect(websocket, doc_id)
try:
while True:
data = await websocket.receive_json()
if data["type"] == "update":
await document_manager.update_document(
doc_id,
data["content"],
user_id
)
except WebSocketDisconnect:
document_manager.disconnect(websocket, doc_id)
Error Handling
from fastapi import WebSocket, WebSocketException
from starlette import status
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
try:
await websocket.accept()
while True:
try:
data = await websocket.receive_text()
# Process data
await websocket.send_text(f"Echo: {data}")
except WebSocketException as e:
print(f"WebSocket error: {e}")
break
except Exception as e:
print(f"Connection error: {e}")
await websocket.close(code=status.WS_1006_ABNORMAL_CLOSURE)
Testing WebSockets
Testing FastAPI WebSockets
from fastapi.testclient import TestClient
import pytest
def test_websocket():
client = TestClient(app)
with client.websocket_connect("/ws") as websocket:
websocket.send_text("Hello")
data = websocket.receive_text()
assert data == "Message text was: Hello"
Testing Django Channels
from channels.testing import WebsocketCommunicator
from myapp.consumers import ChatConsumer
import pytest
@pytest.mark.asyncio
async def test_chat_consumer():
communicator = WebsocketCommunicator(ChatConsumer.as_asgi(), "/ws/chat/test/")
connected, subprotocol = await communicator.connect()
assert connected
# Send message
await communicator.send_json_to({
"message": "Hello",
"user": "test_user"
})
# Receive message
response = await communicator.receive_json_from()
assert response["message"] == "Hello"
await communicator.disconnect()
Best Practices
- Handle disconnections gracefully - Always clean up connections
- Implement authentication - Verify user identity before accepting connections
- Use connection managers - Centralize connection handling
- Implement reconnection logic - Handle network interruptions
- Rate limiting - Prevent abuse with message rate limits
- Error handling - Catch and handle WebSocket errors
- Heartbeat/ping - Keep connections alive
- Message validation - Validate incoming messages
- Scalability - Use Redis for distributed systems
- Monitoring - Track connection counts and message rates
Conclusion
WebSockets enable real-time, bidirectional communication essential for modern web applications. Whether you're building chat applications, live notifications, or collaborative features, WebSockets provide the foundation for real-time experiences.
🚀 Next Steps: Consider implementing WebSocket authentication, adding message persistence, and scaling with Redis for production deployments.
Resources:
Related Articles: