Building AI Agents with LangChain and FastAPI: A Complete Guide
Introduction
AI agents are programs that use a language model to reason about a task, decide which tools to use, and take actions to complete it — autonomously, step by step. In 2025-2026, building and deploying AI agents has become a core skill for backend developers.
LangChain is the most widely used Python framework for building LLM-powered applications. FastAPI is the best way to expose those applications as production-grade REST APIs. Together, they give you a powerful stack for building and serving AI agents at scale.
In this guide, you'll build a production-ready AI agent using LangChain, expose it as a FastAPI endpoint with streaming support, add memory for multi-turn conversations, and learn how to test and deploy it.
🔗 Make sure your Python environment is set up: uv: The Fast Python Package Manager Replacing pip in 2026
Introduction: What Are AI Agents and Why Do They Matter?
A traditional LLM call takes an input and returns an output in one step. An agent is different — it uses the LLM's reasoning capabilities to decide what to do next, which might involve:
- Calling a tool (searching the web, querying a database, running code)
- Processing the tool's output
- Calling another tool if needed
- Repeating until the task is complete
- Returning a final answer
This "think → act → observe → think" loop is what makes agents powerful: they can handle complex, multi-step tasks that a single LLM call cannot.
When to Use Agents vs. Simple LLM Calls
| Use case | Approach |
|---|---|
| Answer a factual question from context | Simple chain |
| Summarize a document | Simple chain |
| Research a topic (needs web search) | Agent |
| Debug code (needs to read files, run code) | Agent |
| Book a calendar event (needs tools + reasoning) | Agent |
| Complex data analysis over multiple tables | Agent with RAG |
Understanding LangChain's Agent Architecture
LangChain's agent architecture consists of several components:
Core Components
- LLM / Chat Model: The underlying language model (OpenAI, Anthropic, local models)
- Tools: Python functions the agent can call, described with a name, description, and input schema
- Agent: The reasoning loop that decides which tools to call and in what order
- AgentExecutor: The runtime that runs the agent loop, handles errors, and enforces limits
- Memory: Stores conversation history for multi-turn interactions
The ReAct Pattern
LangChain's most common agent type uses the ReAct (Reasoning + Acting) pattern:
Thought: I need to find the current weather in London
Action: web_search
Action Input: "current weather London"
Observation: Weather in London: 12°C, partly cloudy
Thought: I have the information. I can now answer the user's question.
Final Answer: The current weather in London is 12°C and partly cloudy.
LangChain Expression Language (LCEL)
Modern LangChain uses LCEL to compose chains using the | operator:
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
llm = ChatOpenAI(model="gpt-4o")
prompt = ChatPromptTemplate.from_messages([
("system", "You are a helpful assistant."),
("human", "{input}"),
])
parser = StrOutputParser()
chain = prompt | llm | parser
result = chain.invoke({"input": "What is FastAPI?"})
Project Setup: FastAPI + LangChain + LLM Provider
Install Dependencies
uv init langchain-agent-api
cd langchain-agent-api
uv add fastapi "uvicorn[standard]" \
langchain langchain-openai langchain-community \
pydantic-settings python-dotenv \
"redis[hiredis]"
uv add --dev pytest pytest-asyncio httpx ruff
Project Structure
langchain-agent-api/
├── app/
│ ├── __init__.py
│ ├── main.py # FastAPI app
│ ├── agents/
│ │ ├── __init__.py
│ │ ├── base.py # Base agent setup
│ │ └── research.py # Research agent
│ ├── tools/
│ │ ├── __init__.py
│ │ ├── search.py # Web search tool
│ │ └── calculator.py # Math tool
│ ├── memory/
│ │ └── redis_memory.py # Redis-backed conversation memory
│ └── core/
│ ├── config.py
│ └── dependencies.py
├── pyproject.toml
└── .env
Configuration
# app/core/config.py
from pydantic_settings import BaseSettings
class Settings(BaseSettings):
OPENAI_API_KEY: str
REDIS_URL: str = "redis://localhost:6379/0"
AGENT_MAX_ITERATIONS: int = 10
AGENT_TIMEOUT: int = 60 # seconds
class Config:
env_file = ".env"
settings = Settings()
# .env
OPENAI_API_KEY=sk-...
REDIS_URL=redis://localhost:6379/0
Building Your First ReAct Agent with Tool Calling
# app/tools/search.py
from langchain_core.tools import tool
import httpx
@tool
async def web_search(query: str) -> str:
"""Search the web for current information on a topic.
Use this when you need up-to-date information or facts you're not sure about.
"""
# In production, use a real search API (Tavily, SerpAPI, etc.)
async with httpx.AsyncClient() as client:
response = await client.get(
"https://api.tavily.com/search",
params={"query": query, "max_results": 3},
headers={"Authorization": f"Bearer {settings.TAVILY_API_KEY}"},
)
results = response.json()["results"]
return "\n\n".join(
f"**{r['title']}**\n{r['content']}" for r in results
)
@tool
def calculate(expression: str) -> str:
"""Evaluate a mathematical expression. Input should be a valid Python math expression.
Example: '2 ** 10', '(15 + 7) * 3'
"""
import ast
import operator
allowed_ops = {
ast.Add: operator.add,
ast.Sub: operator.sub,
ast.Mult: operator.mul,
ast.Div: operator.truediv,
ast.Pow: operator.pow,
ast.USub: operator.neg,
}
def eval_expr(node):
if isinstance(node, ast.Constant):
return node.value
elif isinstance(node, ast.BinOp):
return allowed_ops[type(node.op)](eval_expr(node.left), eval_expr(node.right))
elif isinstance(node, ast.UnaryOp):
return allowed_ops[type(node.op)](eval_expr(node.operand))
raise ValueError(f"Unsupported expression")
try:
tree = ast.parse(expression, mode="eval")
result = eval_expr(tree.body)
return str(result)
except Exception as e:
return f"Error evaluating expression: {e}"
# app/agents/base.py
from langchain_openai import ChatOpenAI
from langchain.agents import create_react_agent, AgentExecutor
from langchain import hub
from app.core.config import settings
from app.tools.search import web_search, calculate
def create_agent_executor() -> AgentExecutor:
llm = ChatOpenAI(
model="gpt-4o",
temperature=0,
api_key=settings.OPENAI_API_KEY,
)
tools = [web_search, calculate]
# Pull the ReAct prompt from LangChain hub
prompt = hub.pull("hwchase17/react")
agent = create_react_agent(llm=llm, tools=tools, prompt=prompt)
return AgentExecutor(
agent=agent,
tools=tools,
max_iterations=settings.AGENT_MAX_ITERATIONS,
max_execution_time=settings.AGENT_TIMEOUT,
handle_parsing_errors=True,
verbose=True,
)
Adding Memory: Conversation History and Context Windows
For multi-turn conversations, the agent needs to remember previous exchanges. We'll use Redis to store conversation history:
# app/memory/redis_memory.py
import json
import redis.asyncio as aioredis
from langchain_core.messages import HumanMessage, AIMessage, BaseMessage
from app.core.config import settings
class RedisConversationMemory:
def __init__(self, session_id: str, ttl: int = 3600):
self.session_id = session_id
self.ttl = ttl
self.key = f"chat:history:{session_id}"
self._redis: aioredis.Redis | None = None
async def get_redis(self) -> aioredis.Redis:
if self._redis is None:
self._redis = aioredis.from_url(settings.REDIS_URL, decode_responses=True)
return self._redis
async def get_messages(self) -> list[BaseMessage]:
redis = await self.get_redis()
raw = await redis.get(self.key)
if not raw:
return []
messages = []
for item in json.loads(raw):
if item["role"] == "human":
messages.append(HumanMessage(content=item["content"]))
elif item["role"] == "ai":
messages.append(AIMessage(content=item["content"]))
return messages
async def add_messages(self, human_msg: str, ai_msg: str):
redis = await self.get_redis()
messages = await self.get_messages()
messages.append(HumanMessage(content=human_msg))
messages.append(AIMessage(content=ai_msg))
# Keep last 20 messages to avoid exceeding context window
messages = messages[-20:]
serialized = [
{"role": "human" if isinstance(m, HumanMessage) else "ai", "content": m.content}
for m in messages
]
await redis.setex(self.key, self.ttl, json.dumps(serialized))
async def clear(self):
redis = await self.get_redis()
await redis.delete(self.key)
🔗 See: Redis Caching in Django and FastAPI: A Practical Guide
Building a RAG Pipeline with a Vector Store
Retrieval-Augmented Generation (RAG) lets your agent answer questions based on your own documents:
# app/rag/pipeline.py
from langchain_openai import OpenAIEmbeddings
from langchain_community.vectorstores import Chroma
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_core.documents import Document
class RAGPipeline:
def __init__(self, persist_directory: str = "./chroma_db"):
self.embeddings = OpenAIEmbeddings()
self.persist_directory = persist_directory
self._vectorstore: Chroma | None = None
def get_vectorstore(self) -> Chroma:
if self._vectorstore is None:
self._vectorstore = Chroma(
persist_directory=self.persist_directory,
embedding_function=self.embeddings,
)
return self._vectorstore
def ingest_documents(self, documents: list[Document]):
splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200,
)
chunks = splitter.split_documents(documents)
vectorstore = self.get_vectorstore()
vectorstore.add_documents(chunks)
def as_retriever_tool(self):
from langchain_core.tools import Tool
retriever = self.get_vectorstore().as_retriever(
search_type="mmr",
search_kwargs={"k": 5, "fetch_k": 20},
)
def retrieve(query: str) -> str:
docs = retriever.invoke(query)
return "\n\n".join(doc.page_content for doc in docs)
return Tool(
name="search_knowledge_base",
description=(
"Search the internal knowledge base for relevant information. "
"Use this before searching the web for topics the knowledge base might cover."
),
func=retrieve,
)
Exposing the Agent as a FastAPI Endpoint with SSE Streaming
Agents can take time to respond. Streaming the response token-by-token provides a much better user experience:
# app/main.py
import asyncio
from typing import AsyncGenerator
from fastapi import FastAPI, HTTPException
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from app.agents.base import create_agent_executor
from app.memory.redis_memory import RedisConversationMemory
app = FastAPI(title="AI Agent API")
class ChatRequest(BaseModel):
message: str
session_id: str = "default"
class ChatResponse(BaseModel):
response: str
session_id: str
@app.post("/api/chat", response_model=ChatResponse)
async def chat(request: ChatRequest):
"""Non-streaming chat endpoint."""
memory = RedisConversationMemory(request.session_id)
history = await memory.get_messages()
agent_executor = create_agent_executor()
# Format history as a string for the ReAct agent
chat_history = "\n".join(
f"{'Human' if isinstance(m, HumanMessage) else 'AI'}: {m.content}"
for m in history
)
try:
result = await asyncio.wait_for(
agent_executor.ainvoke({
"input": request.message,
"chat_history": chat_history,
}),
timeout=60.0,
)
response = result["output"]
await memory.add_messages(request.message, response)
return ChatResponse(response=response, session_id=request.session_id)
except asyncio.TimeoutError:
raise HTTPException(status_code=504, detail="Agent timed out")
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/api/chat/stream")
async def chat_stream(request: ChatRequest):
"""Streaming chat endpoint using Server-Sent Events."""
async def generate() -> AsyncGenerator[str, None]:
memory = RedisConversationMemory(request.session_id)
history = await memory.get_messages()
agent_executor = create_agent_executor()
chat_history = "\n".join(
f"{'Human' if isinstance(m, HumanMessage) else 'AI'}: {m.content}"
for m in history
)
full_response = ""
try:
async for event in agent_executor.astream_events(
{"input": request.message, "chat_history": chat_history},
version="v1",
):
kind = event["event"]
if kind == "on_llm_stream":
chunk = event["data"]["chunk"].content
if chunk:
full_response += chunk
yield f"data: {chunk}\n\n"
elif kind == "on_tool_start":
tool_name = event["name"]
yield f"data: [Using tool: {tool_name}...]\n\n"
# Store in memory after full response
await memory.add_messages(request.message, full_response)
yield "data: [DONE]\n\n"
except Exception as e:
yield f"data: [ERROR: {e}]\n\n"
return StreamingResponse(
generate(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no",
},
)
@app.delete("/api/chat/{session_id}")
async def clear_conversation(session_id: str):
"""Clear a conversation's history."""
memory = RedisConversationMemory(session_id)
await memory.clear()
return {"message": f"Session {session_id} cleared"}
🔗 For WebSocket alternatives to SSE: Implementing Real-Time Features with WebSockets in FastAPI and Django
Handling Agent State: Sessions and Multi-Turn Conversations
Each conversation should be isolated by session_id. Here's a complete example of a multi-turn exchange:
# Client usage
import httpx
BASE_URL = "http://localhost:8000"
SESSION_ID = "user-123-session-1"
with httpx.Client() as client:
# Turn 1
r1 = client.post(f"{BASE_URL}/api/chat", json={
"message": "What is the current price of Bitcoin?",
"session_id": SESSION_ID,
})
print(r1.json()["response"])
# Turn 2 — agent remembers the context
r2 = client.post(f"{BASE_URL}/api/chat", json={
"message": "How does that compare to last year?",
"session_id": SESSION_ID,
})
print(r2.json()["response"])
# Clear session
client.delete(f"{BASE_URL}/api/chat/{SESSION_ID}")
Error Handling, Retries, and Timeouts in Agentic Workflows
Agents can fail in ways that regular API calls don't: they might loop, hit rate limits, or have tools that fail. Robust error handling is critical:
import asyncio
import tenacity
from langchain_core.exceptions import OutputParserException
def create_robust_agent_executor() -> AgentExecutor:
llm = ChatOpenAI(
model="gpt-4o",
temperature=0,
# LangChain handles retries on rate limit errors automatically
max_retries=3,
)
return AgentExecutor(
agent=create_react_agent(llm=llm, tools=tools, prompt=prompt),
tools=tools,
max_iterations=10, # prevent infinite loops
max_execution_time=60, # timeout in seconds
handle_parsing_errors=True, # don't crash on malformed LLM output
early_stopping_method="generate", # graceful stop on max_iterations
)
@app.post("/api/chat", response_model=ChatResponse)
async def chat(request: ChatRequest):
try:
result = await asyncio.wait_for(
agent_executor.ainvoke({"input": request.message}),
timeout=65.0, # slightly longer than agent timeout
)
return ChatResponse(response=result["output"])
except asyncio.TimeoutError:
raise HTTPException(
status_code=504,
detail="The agent took too long to respond. Please try a simpler question."
)
except OutputParserException as e:
raise HTTPException(
status_code=500,
detail="The agent produced an unexpected response format."
)
except Exception as e:
# Log the full error for debugging
logger.error(f"Agent error for session {request.session_id}: {e}", exc_info=True)
raise HTTPException(status_code=500, detail="An unexpected error occurred.")
Async Agents: Running Tools Concurrently with asyncio
When an agent calls multiple tools, running them concurrently can dramatically reduce latency:
from langchain_core.tools import StructuredTool
import asyncio
async def parallel_search(queries: list[str]) -> list[str]:
"""Run multiple searches concurrently."""
tasks = [web_search.ainvoke({"query": q}) for q in queries]
results = await asyncio.gather(*tasks, return_exceptions=True)
return [r if isinstance(r, str) else f"Error: {r}" for r in results]
@tool
async def research_topic(topic: str) -> str:
"""Research a topic from multiple angles simultaneously.
Better than a single search when you need comprehensive information.
"""
queries = [
f"{topic} overview",
f"{topic} latest news 2026",
f"{topic} technical details",
]
results = await parallel_search(queries)
return "\n\n---\n\n".join(results)
🔗 For a deep dive into async patterns: Understanding Async Programming in Python: A Comprehensive Guide
Testing AI Agents
Testing agents requires mocking LLM calls so tests are fast, deterministic, and don't incur API costs:
# tests/test_agent.py
import pytest
from unittest.mock import AsyncMock, patch
from httpx import AsyncClient, ASGITransport
from app.main import app
@pytest.fixture
async def client():
async with AsyncClient(
transport=ASGITransport(app=app),
base_url="http://test",
) as c:
yield c
@pytest.mark.asyncio
async def test_simple_chat(client):
"""Test that the chat endpoint returns a response."""
with patch("app.agents.base.ChatOpenAI") as mock_llm:
# Mock the LLM response
mock_llm.return_value.ainvoke = AsyncMock(
return_value={"output": "Hello! How can I help you today?"}
)
response = await client.post("/api/chat", json={
"message": "Hello",
"session_id": "test-session",
})
assert response.status_code == 200
assert "response" in response.json()
@pytest.mark.asyncio
async def test_agent_timeout(client):
"""Test that the endpoint handles agent timeouts gracefully."""
async def slow_invoke(*args, **kwargs):
await asyncio.sleep(100) # Simulate a very slow agent
with patch("app.agents.base.AgentExecutor.ainvoke", side_effect=slow_invoke):
response = await client.post("/api/chat", json={
"message": "Research every topic ever",
"session_id": "test-timeout",
})
assert response.status_code == 504
@pytest.mark.asyncio
async def test_clear_conversation(client):
"""Test clearing a conversation session."""
response = await client.delete("/api/chat/test-session")
assert response.status_code == 200
assert "cleared" in response.json()["message"]
🔗 Related: Testing APIs in Django and FastAPI: Best Practices and Tools
Deployment Considerations and Cost Management
Caching LLM Responses
For deterministic queries (same input → same output), cache LLM responses to reduce costs:
from langchain.globals import set_llm_cache
from langchain_community.cache import RedisCache
import redis
# Cache LLM responses in Redis
set_llm_cache(RedisCache(redis.from_url(settings.REDIS_URL)))
🔗 See: Redis Caching in Django and FastAPI: A Practical Guide
Background Task Processing for Long Agents
For agents that take more than ~30 seconds, use background tasks:
from fastapi import BackgroundTasks
import uuid
# Store results in Redis keyed by job ID
@app.post("/api/agent/jobs")
async def create_agent_job(request: ChatRequest, background_tasks: BackgroundTasks):
job_id = str(uuid.uuid4())
background_tasks.add_task(run_agent_job, job_id, request)
return {"job_id": job_id}
@app.get("/api/agent/jobs/{job_id}")
async def get_job_result(job_id: str, redis: aioredis.Redis = Depends(get_redis)):
result = await redis.get(f"job:{job_id}")
if result is None:
return {"status": "pending"}
return {"status": "completed", "result": json.loads(result)}
🔗 Related: Using Background Tasks for Heavy Operations in FastAPI
Cost Estimation
| Model | Typical agent cost per query |
|---|---|
| GPT-4o | $0.002–$0.02 (varies by tools used) |
| GPT-4o-mini | $0.0002–$0.002 |
| Claude 3.5 Sonnet | $0.003–$0.03 |
For production, always set usage limits and monitor costs via your provider's dashboard.
Conclusion: Next Steps — CrewAI, LangGraph, AutoGen
You've built a production-ready AI agent with LangChain and FastAPI. The patterns here — tool calling, memory, streaming, and async execution — apply across the agentic AI ecosystem.
What to explore next:
- LangGraph: LangChain's framework for building stateful, graph-based agent workflows. Better for complex multi-agent systems with conditional logic
- CrewAI: Framework for orchestrating multiple specialized agents working together on a task
- AutoGen: Microsoft's multi-agent conversation framework, good for agent-to-agent communication
- MCP: If you want your agents to integrate with AI assistants like Claude, see Building MCP Servers with Python: The Complete Guide for 2026
The key architectural decisions you made — exposing agents via FastAPI, using Redis for session memory, streaming responses — will serve you well as you move to more complex multi-agent systems.
🔗 Related: Building an AI-Powered Code Generator with OpenAI API | Running Lightweight Open-Source LLMs Locally
📌 Up next: Deploying Python APIs to Production: Railway, Fly.io, and Render